Class | Stomp::Client |
In: |
lib/stomp/client.rb
|
Parent: | Object |
host | [R] | |
login | [R] | |
parameters | [R] | |
passcode | [R] | |
port | [R] | |
reliable | [R] |
A new Client object can be initialized using two forms:
Standard positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 36 36: def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) 37: 38: # Parse stomp:// URL's or set params 39: if login.is_a?(Hash) 40: @parameters = login 41: 42: first_host = @parameters[:hosts][0] 43: 44: @login = first_host[:login] 45: @passcode = first_host[:passcode] 46: @host = first_host[:host] 47: @port = first_host[:port] || Connection::default_port(first_host[:ssl]) 48: 49: @reliable = true 50: 51: elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port 52: @login = $2 || "" 53: @passcode = $3 || "" 54: @host = $4 55: @port = $5.to_i 56: @reliable = false 57: elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param 58: 59: first_host = {} 60: first_host[:ssl] = !$2.nil? 61: @login = first_host[:login] = $4 || "" 62: @passcode = first_host[:passcode] = $5 || "" 63: @host = first_host[:host] = $6 64: @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl]) 65: 66: options = $16 || "" 67: parts = options.split(/&|=/) 68: options = Hash[*parts] 69: 70: hosts = [first_host] + parse_hosts(login) 71: 72: @parameters = {} 73: @parameters[:hosts] = hosts 74: 75: @parameters.merge! filter_options(options) 76: 77: @reliable = true 78: else 79: @login = login 80: @passcode = passcode 81: @host = host 82: @port = port.to_i 83: @reliable = reliable 84: end 85: 86: check_arguments! 87: 88: @id_mutex = Mutex.new 89: @ids = 1 90: 91: if @parameters 92: @connection = Connection.new(@parameters) 93: else 94: @connection = Connection.new(@login, @passcode, @host, @port, @reliable) 95: end 96: 97: start_listeners 98: 99: end
Syntactic sugar for ‘Client.new’ See ‘initialize’ for usage.
# File lib/stomp/client.rb, line 102 102: def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) 103: Client.new(login, passcode, host, port, reliable) 104: end
Abort a transaction by name
# File lib/stomp/client.rb, line 118 118: def abort(name, headers = {}) 119: @connection.abort(name, headers) 120: 121: # lets replay any ack'd messages in this transaction 122: replay_list = @replay_messages_by_txn[name] 123: if replay_list 124: replay_list.each do |message| 125: if listener = find_listener(message) 126: listener.call(message) 127: end 128: end 129: end 130: end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 167 167: def acknowledge(message, headers = {}) 168: txn_id = headers[:transaction] 169: if txn_id 170: # lets keep around messages ack'd in this transaction in case we rollback 171: replay_list = @replay_messages_by_txn[txn_id] 172: if replay_list.nil? 173: replay_list = [] 174: @replay_messages_by_txn[txn_id] = replay_list 175: end 176: replay_list << message 177: end 178: if block_given? 179: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 180: end 181: @connection.ack message.headers['message-id'], headers 182: end
Begin a transaction by name
# File lib/stomp/client.rb, line 113 113: def begin(name, headers = {}) 114: @connection.begin(name, headers) 115: end
Close out resources in use by this client
# File lib/stomp/client.rb, line 231 231: def close headers={} 232: @listener_thread.exit 233: @connection.disconnect headers 234: end
Commit a transaction by name
# File lib/stomp/client.rb, line 133 133: def commit(name, headers = {}) 134: txn_id = headers[:transaction] 135: @replay_messages_by_txn.delete(txn_id) 136: @connection.commit(name, headers) 137: end
# File lib/stomp/client.rb, line 212 212: def connection_frame 213: @connection.connection_frame 214: end
# File lib/stomp/client.rb, line 216 216: def disconnect_receipt 217: @connection.disconnect_receipt 218: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp/client.rb, line 108 108: def join(limit = nil) 109: @listener_thread.join(limit) 110: end
Publishes message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 196 196: def publish(destination, message, headers = {}) 197: if block_given? 198: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 199: end 200: @connection.publish(destination, message, headers) 201: end
Check if the thread was created and isn‘t dead
# File lib/stomp/client.rb, line 237 237: def running 238: @listener_thread && !!@listener_thread.status 239: end
# File lib/stomp/client.rb, line 207 207: def send(*args) 208: warn("This method is deprecated and will be removed on the next release. Use 'publish' instead") 209: publish(*args) 210: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 143 143: def subscribe(destination, headers = {}) 144: raise "No listener given" unless block_given? 145: # use subscription id to correlate messages to subscription. As described in 146: # the SUBSCRIPTION section of the protocol: http://stomp.codehaus.org/Protocol. 147: # If no subscription id is provided, generate one. 148: set_subscription_id_if_missing(destination, headers) 149: if @listeners[headers[:id]] 150: raise "attempting to subscribe to a queue with a previous subscription" 151: end 152: @listeners[headers[:id]] = lambda {|msg| yield msg} 153: @connection.subscribe(destination, headers) 154: end
Unreceive a message, sending it back to its queue or to the DLQ
# File lib/stomp/client.rb, line 186 186: def unreceive(message, options = {}) 187: @connection.unreceive(message, options) 188: end
Unsubecribe from a channel
# File lib/stomp/client.rb, line 157 157: def unsubscribe(name, headers = {}) 158: set_subscription_id_if_missing(name, headers) 159: @connection.unsubscribe(name, headers) 160: @listeners[headers[:id]] = nil 161: end
# File lib/stomp/client.rb, line 286 286: def check_arguments! 287: raise ArgumentError if @host.nil? || @host.empty? 288: raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535 289: raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass) 290: end
# File lib/stomp/client.rb, line 292 292: def filter_options(options) 293: new_options = {} 294: new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms 295: new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms 296: new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true 297: new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i 298: new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i 299: new_options[:randomize] = options["randomize"] == "true" # Default: false 300: new_options[:backup] = false # Not implemented yet: I'm using a master X slave solution 301: new_options[:timeout] = -1 # Not implemented yet: a "timeout(5) do ... end" would do the trick, feel free 302: 303: new_options 304: end
# File lib/stomp/client.rb, line 306 306: def find_listener(message) 307: subscription_id = message.headers['subscription'] 308: if subscription_id == nil 309: # For backward compatibility, some messages may already exist with no 310: # subscription id, in which case we can attempt to synthesize one. 311: set_subscription_id_if_missing(message.headers['destination'], message.headers) 312: subscription_id = message.headers['id'] 313: end 314: @listeners[subscription_id] 315: end
# File lib/stomp/client.rb, line 268 268: def parse_hosts(url) 269: hosts = [] 270: 271: host_match = /stomp(\+ssl)?:\/\/(([\w\.]*):(\w*)@)?([\w\.]+):(\d+)\)/ 272: url.scan(host_match).each do |match| 273: host = {} 274: host[:ssl] = !match[0].nil? 275: host[:login] = match[2] || "" 276: host[:passcode] = match[3] || "" 277: host[:host] = match[4] 278: host[:port] = match[5].to_i 279: 280: hosts << host 281: end 282: 283: hosts 284: end
# File lib/stomp/client.rb, line 253 253: def register_receipt_listener(listener) 254: id = -1 255: @id_mutex.synchronize do 256: id = @ids.to_s 257: @ids = @ids.succ 258: end 259: @receipt_listeners[id] = listener 260: id 261: end
Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.codehaus.org/Protocol
# File lib/stomp/client.rb, line 246 246: def set_subscription_id_if_missing(destination, headers) 247: headers[:id] = headers[:id] ? headers[:id] : headers['id'] 248: if headers[:id] == nil 249: headers[:id] = Digest::SHA1.hexdigest(destination) 250: end 251: end
# File lib/stomp/client.rb, line 317 317: def start_listeners 318: @listeners = {} 319: @receipt_listeners = {} 320: @replay_messages_by_txn = {} 321: 322: @listener_thread = Thread.start do 323: while true 324: message = @connection.receive 325: if message.command == 'MESSAGE' 326: if listener = find_listener(message) 327: listener.call(message) 328: end 329: elsif message.command == 'RECEIPT' 330: if listener = @receipt_listeners[message.headers['receipt-id']] 331: listener.call(message) 332: end 333: end 334: end 335: end 336: 337: end