Class | Stomp::Connection |
In: |
lib/stomp.rb
|
Parent: | Object |
Low level connection which maps commands and supports synchronous receives
Create a connection, requires a login and passcode. Can accept a host (default is localhost), and port (default is 61613) to connect to
# File lib/stomp.rb, line 33 33: def initialize(login, passcode, host='localhost', port=61613, reliable=false, reconnectDelay=5) 34: @host = host 35: @port = port 36: @login = login 37: @passcode = passcode 38: @transmit_semaphore = Mutex.new 39: @read_semaphore = Mutex.new 40: @socket_semaphore = Mutex.new 41: @reliable = reliable 42: @reconnectDelay = reconnectDelay 43: @closed = FALSE 44: @subscriptions = {} 45: @failure = NIL 46: socket 47: end
# File lib/stomp.rb, line 26 26: def Connection.open(login = "", passcode = "", host='localhost', port=61613, reliable=FALSE, reconnectDelay=5) 27: Connection.new login, passcode, host, port, reliable, reconnectDelay 28: end
Receive a frame, block until the frame is received
# File lib/stomp.rb, line 156 156: def __old_receive 157: # The recive my fail so we may need to retry. 158: while TRUE 159: begin 160: s = socket 161: return _receive(s) 162: rescue 163: @failure = $!; 164: raise unless @reliable 165: $stderr.print "receive failed: " + $!; 166: end 167: end 168: end
Abort a transaction by name
# File lib/stomp.rb, line 106 106: def abort name, headers={} 107: headers[:transaction] = name 108: transmit "ABORT", headers 109: end
Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 94 94: def ack message_id, headers={} 95: headers['message-id'] = message_id 96: transmit "ACK", headers 97: end
Begin a transaction, requires a name for the transaction
# File lib/stomp.rb, line 85 85: def begin name, headers={} 86: headers[:transaction] = name 87: transmit "BEGIN", headers 88: end
Commit a transaction by name
# File lib/stomp.rb, line 100 100: def commit name, headers={} 101: headers[:transaction] = name 102: transmit "COMMIT", headers 103: end
Close this connection
# File lib/stomp.rb, line 142 142: def disconnect(headers = {}) 143: transmit "DISCONNECT", headers 144: end
Return a pending message if one is available, otherwise return nil
# File lib/stomp.rb, line 148 148: def poll 149: @read_semaphore.synchronize do 150: return nil if @socket==NIL or !@socket.ready? 151: return receive 152: end 153: end
# File lib/stomp.rb, line 170 170: def receive 171: super_result = __old_receive() 172: if super_result.nil? && @reliable 173: $stderr.print "connection.receive returning EOF as nil - resetting connection.\n" 174: @socket = nil 175: super_result = __old_receive() 176: end 177: return super_result 178: end
Send message to destination
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 136 136: def send(destination, message, headers={}) 137: headers[:destination] = destination 138: transmit "SEND", headers, message 139: end
# File lib/stomp.rb, line 49 49: def socket 50: # Need to look into why the following synchronize does not work. 51: #@read_semaphore.synchronize do 52: s = @socket; 53: while s == NIL or @failure != NIL 54: @failure = NIL 55: begin 56: s = TCPSocket.open @host, @port 57: _transmit(s, "CONNECT", {:login => @login, :passcode => @passcode}) 58: @connect = _receive(s) 59: # replay any subscriptions. 60: @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) } 61: rescue 62: @failure = $!; 63: s=NIL; 64: raise unless @reliable 65: $stderr.print "connect failed: " + $! +" will retry in #{@reconnectDelay}\n"; 66: sleep(@reconnectDelay); 67: end 68: end 69: @socket = s 70: return s; 71: #end 72: end
Subscribe to a destination, must specify a name
# File lib/stomp.rb, line 112 112: def subscribe(name, headers = {}, subId=NIL) 113: headers[:destination] = name 114: transmit "SUBSCRIBE", headers 115: 116: # Store the sub so that we can replay if we reconnect. 117: if @reliable 118: subId = name if subId==NIL 119: @subscriptions[subId]=headers 120: end 121: end
Unsubscribe from a destination, must specify a name
# File lib/stomp.rb, line 124 124: def unsubscribe(name, headers = {}, subId=NIL) 125: headers[:destination] = name 126: transmit "UNSUBSCRIBE", headers 127: if @reliable 128: subId = name if subId==NIL 129: @subscriptions.delete(subId) 130: end 131: end
# File lib/stomp.rb, line 181 181: def _receive( s ) 182: line = ' ' 183: @read_semaphore.synchronize do 184: line = s.gets while line =~ /^\s*$/ 185: return NIL if line == NIL 186: Message.new do |m| 187: m.command = line.chomp 188: m.headers = {} 189: until (line = s.gets.chomp) == '' 190: k = (line.strip[0, line.strip.index(':')]).strip 191: v = (line.strip[line.strip.index(':') + 1, line.strip.length]).strip 192: m.headers[k] = v 193: end 194: 195: if (m.headers['content-length']) 196: m.body = s.read m.headers['content-length'].to_i 197: c = s.getc 198: raise "Invalid content length received" unless c == 0 199: else 200: m.body = '' 201: until (c = s.getc) == 0 202: m.body << c.chr 203: end 204: end 205: #c = s.getc 206: #raise "Invalid frame termination received" unless c == 10 207: end 208: end 209: end
# File lib/stomp.rb, line 228 228: def _transmit(s, command, headers={}, body='') 229: @transmit_semaphore.synchronize do 230: s.puts command 231: headers.each {|k,v| s.puts "#{k}:#{v}" } 232: s.puts "content-length: #{body.length}" 233: s.puts "content-type: text/plain; charset=UTF-8" 234: s.puts 235: s.write body 236: s.write "\0" 237: end 238: end
# File lib/stomp.rb, line 212 212: def transmit(command, headers={}, body='') 213: # The transmit my fail so we may need to retry. 214: while TRUE 215: begin 216: s = socket 217: _transmit(s, command, headers, body) 218: return 219: rescue 220: @failure = $!; 221: raise unless @reliable 222: $stderr.print "transmit failed: " + $!+"\n"; 223: end 224: end 225: end