Class | Stomp::Client |
In: |
lib/stomp.rb
|
Parent: | Object |
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 263 263: def initialize user="", pass="", host="localhost", port=61613, reliable=false 264: if user =~ /stomp:\/\/(\w+):(\d+)/ 265: user = "" 266: pass = "" 267: host = $1 268: port = $2 269: reliable = false 270: elsif user =~ /stomp:\/\/(\w+):(\w+)@(\w+):(\d+)/ 271: user = $1 272: pass = $2 273: host = $3 274: port = $4 275: reliable = false 276: end 277: 278: @id_mutex = Mutex.new 279: @ids = 1 280: @connection = Connection.open user, pass, host, port, reliable 281: @listeners = {} 282: @receipt_listeners = {} 283: @running = true 284: @replay_messages_by_txn = Hash.new 285: @listener_thread = Thread.start do 286: while @running 287: message = @connection.receive 288: case 289: when message == NIL: 290: break 291: when message.command == 'MESSAGE': 292: if listener = @listeners[message.headers['destination']] 293: listener.call(message) 294: end 295: when message.command == 'RECEIPT': 296: if listener = @receipt_listeners[message.headers['receipt-id']] 297: listener.call(message) 298: end 299: end 300: end 301: end 302: end
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 312 312: def self.open user="", pass="", host="localhost", port=61613, reliable=false 313: Client.new user, pass, host, port, reliable 314: end
Abort a transaction by name
# File lib/stomp.rb, line 322 322: def abort name, headers={} 323: @connection.abort name, headers 324: 325: # lets replay any ack'd messages in this transaction 326: replay_list = @replay_messages_by_txn[name] 327: if replay_list 328: replay_list.each do |message| 329: if listener = @listeners[message.headers['destination']] 330: listener.call(message) 331: end 332: end 333: end 334: end
Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 363 363: def acknowledge message, headers={} 364: txn_id = headers[:transaction] 365: if txn_id 366: # lets keep around messages ack'd in this transaction in case we rollback 367: replay_list = @replay_messages_by_txn[txn_id] 368: if replay_list == nil 369: replay_list = [] 370: @replay_messages_by_txn[txn_id] = replay_list 371: end 372: replay_list << message 373: end 374: if block_given? 375: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 376: end 377: @connection.ack message.headers['message-id'], headers 378: end
Begin a transaction by name
# File lib/stomp.rb, line 317 317: def begin name, headers={} 318: @connection.begin name, headers 319: end
Close out resources in use by this client
# File lib/stomp.rb, line 399 399: def close 400: @connection.disconnect 401: @running = false 402: end
Commit a transaction by name
# File lib/stomp.rb, line 337 337: def commit name, headers={} 338: txn_id = headers[:transaction] 339: @replay_messages_by_txn.delete(txn_id) 340: @connection.commit name, headers 341: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp.rb, line 306 306: def join 307: @listener_thread.join 308: end
Send message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 386 386: def send destination, message, headers = {} 387: if block_given? 388: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 389: end 390: @connection.send destination, message, headers 391: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 347 347: def subscribe destination, headers={} 348: raise "No listener given" unless block_given? 349: @listeners[destination] = lambda {|msg| yield msg} 350: @connection.subscribe destination, headers 351: end
Unsubecribe from a channel
# File lib/stomp.rb, line 354 354: def unsubscribe name, headers={} 355: @connection.unsubscribe name, headers 356: @listeners[name] = nil 357: end