Class | Stomp::Connection |
In: |
lib/stomp/connection.rb
|
Parent: | Object |
Low level connection which maps commands and supports synchronous receives
connection_frame | [R] | |
disconnect_receipt | [R] |
A new Connection object accepts the following parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) reconnect_delay (Integer, default : 5) e.g. c = Connection.new("username", "password", "localhost", 61613, true)
Hash:
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :backup => false, :timeout => -1, :connect_headers => {}, :parse_timeout => 5, :logger => nil, } e.g. c = Connection.new(hash)
TODO Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://user:pass@host:port stomp://user:pass@host.domain.tld:port
# File lib/stomp/connection.rb, line 60 60: def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) 61: @received_messages = [] 62: 63: if login.is_a?(Hash) 64: hashed_initialize(login) 65: else 66: @host = host 67: @port = port 68: @login = login 69: @passcode = passcode 70: @reliable = reliable 71: @reconnect_delay = reconnect_delay 72: @connect_headers = connect_headers 73: @ssl = false 74: @parameters = nil 75: @parse_timeout = 5 # To override, use hashed parameters 76: @logger = nil # To override, use hashed parameters 77: end 78: 79: # Use Mutexes: only one lock per each thread 80: # Revert to original implementation attempt 81: @transmit_semaphore = Mutex.new 82: @read_semaphore = Mutex.new 83: @socket_semaphore = Mutex.new 84: 85: @subscriptions = {} 86: @failure = nil 87: @connection_attempts = 0 88: 89: socket 90: end
Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.
# File lib/stomp/connection.rb, line 108 108: def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) 109: Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers) 110: end
Receive a frame, block until the frame is received
# File lib/stomp/connection.rb, line 340 340: def __old_receive 341: # The recive my fail so we may need to retry. 342: while TRUE 343: begin 344: used_socket = socket 345: return _receive(used_socket) 346: rescue 347: @failure = $! 348: raise unless @reliable 349: errstr = "receive failed: #{$!}" 350: if @logger && @logger.respond_to?(:on_miscerr) 351: @logger.on_miscerr(log_params, errstr) 352: else 353: $stderr.print errstr 354: end 355: end 356: end 357: end
Abort a transaction by name
# File lib/stomp/connection.rb, line 234 234: def abort(name, headers = {}) 235: headers[:transaction] = name 236: transmit("ABORT", headers) 237: end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/connection.rb, line 222 222: def ack(message_id, headers = {}) 223: headers['message-id'] = message_id 224: transmit("ACK", headers) 225: end
Begin a transaction, requires a name for the transaction
# File lib/stomp/connection.rb, line 213 213: def begin(name, headers = {}) 214: headers[:transaction] = name 215: transmit("BEGIN", headers) 216: end
# File lib/stomp/connection.rb, line 175 175: def change_host 176: @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize] 177: 178: # Set first as master and send it to the end of array 179: current_host = @parameters[:hosts].shift 180: @parameters[:hosts] << current_host 181: 182: @ssl = current_host[:ssl] 183: @host = current_host[:host] 184: @port = current_host[:port] || Connection::default_port(@ssl) 185: @login = current_host[:login] || "" 186: @passcode = current_host[:passcode] || "" 187: 188: end
# File lib/stomp/connection.rb, line 314 314: def client_ack?(message) 315: headers = @subscriptions[message.headers[:destination]] 316: !headers.nil? && headers[:ack] == "client" 317: end
Commit a transaction by name
# File lib/stomp/connection.rb, line 228 228: def commit(name, headers = {}) 229: headers[:transaction] = name 230: transmit("COMMIT", headers) 231: end
Close this connection
# File lib/stomp/connection.rb, line 320 320: def disconnect(headers = {}) 321: transmit("DISCONNECT", headers) 322: headers = headers.symbolize_keys 323: @disconnect_receipt = receive if headers[:receipt] 324: if @logger && @logger.respond_to?(:on_disconnect) 325: @logger.on_disconnect(log_params) 326: end 327: close_socket 328: end
# File lib/stomp/connection.rb, line 92 92: def hashed_initialize(params) 93: 94: @parameters = refine_params(params) 95: @reliable = true 96: @reconnect_delay = @parameters[:initial_reconnect_delay] 97: @connect_headers = @parameters[:connect_headers] 98: @parse_timeout = @parameters[:parse_timeout] 99: @logger = @parameters[:logger] 100: #sets the first host to connect 101: change_host 102: if @logger && @logger.respond_to?(:on_connecting) 103: @logger.on_connecting(log_params) 104: end 105: end
# File lib/stomp/connection.rb, line 194 194: def increase_reconnect_delay 195: 196: @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] 197: @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay] 198: 199: @reconnect_delay 200: end
# File lib/stomp/connection.rb, line 190 190: def max_reconnect_attempts? 191: !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts] 192: end
Return a pending message if one is available, otherwise return nil
# File lib/stomp/connection.rb, line 332 332: def poll 333: # No need for a read lock here. The receive method eventually fullfills 334: # that requirement. 335: return nil if @socket.nil? || !@socket.ready? 336: receive 337: end
Publish message to destination
To disable content length header ( :suppress_content_length => true ) Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/connection.rb, line 265 265: def publish(destination, message, headers = {}) 266: headers[:destination] = destination 267: transmit("SEND", headers, message) 268: end
# File lib/stomp/connection.rb, line 359 359: def receive 360: super_result = __old_receive 361: if super_result.nil? && @reliable 362: errstr = "connection.receive returning EOF as nil - resetting connection.\n" 363: if @logger && @logger.respond_to?(:on_miscerr) 364: @logger.on_miscerr(log_params, errstr) 365: else 366: $stderr.print errstr 367: end 368: @socket = nil 369: super_result = __old_receive 370: end 371: return super_result 372: end
# File lib/stomp/connection.rb, line 153 153: def refine_params(params) 154: params = params.uncamelize_and_symbolize_keys 155: 156: default_params = { 157: :connect_headers => {}, 158: # Failover parameters 159: :initial_reconnect_delay => 0.01, 160: :max_reconnect_delay => 30.0, 161: :use_exponential_back_off => true, 162: :back_off_multiplier => 2, 163: :max_reconnect_attempts => 0, 164: :randomize => false, 165: :backup => false, 166: :timeout => -1, 167: # Parse Timeout 168: :parse_timeout => 5 169: } 170: 171: default_params.merge(params) 172: 173: end
# File lib/stomp/connection.rb, line 274 274: def send(*args) 275: warn("This method is deprecated and will be removed on the next release. Use 'publish' instead") 276: publish(*args) 277: end
# File lib/stomp/connection.rb, line 112 112: def socket 113: @socket_semaphore.synchronize do 114: used_socket = @socket 115: used_socket = nil if closed? 116: 117: while used_socket.nil? || !@failure.nil? 118: @failure = nil 119: begin 120: used_socket = open_socket 121: # Open complete 122: 123: connect(used_socket) 124: if @logger && @logger.respond_to?(:on_connected) 125: @logger.on_connected(log_params) 126: end 127: @connection_attempts = 0 128: rescue 129: @failure = $! 130: used_socket = nil 131: raise unless @reliable 132: if @logger && @logger.respond_to?(:on_connectfail) 133: @logger.on_connectfail(log_params) 134: else 135: $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n" 136: end 137: raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts? 138: 139: sleep(@reconnect_delay) 140: 141: @connection_attempts += 1 142: 143: if @parameters 144: change_host 145: increase_reconnect_delay 146: end 147: end 148: end 149: @socket = used_socket 150: end 151: end
Subscribe to a destination, must specify a name
# File lib/stomp/connection.rb, line 240 240: def subscribe(name, headers = {}, subId = nil) 241: headers[:destination] = name 242: transmit("SUBSCRIBE", headers) 243: 244: # Store the sub so that we can replay if we reconnect. 245: if @reliable 246: subId = name if subId.nil? 247: @subscriptions[subId] = headers 248: end 249: end
Send a message back to the source or to the dead letter queue
Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" ) Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ) Accepts a force client acknowledgement option (:force_client_ack => true)
# File lib/stomp/connection.rb, line 284 284: def unreceive(message, options = {}) 285: options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options 286: # Lets make sure all keys are symbols 287: message.headers = message.headers.symbolize_keys 288: 289: retry_count = message.headers[:retry_count].to_i || 0 290: message.headers[:retry_count] = retry_count + 1 291: transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}" 292: message_id = message.headers.delete('message-id''message-id') 293: 294: begin 295: self.begin transaction_id 296: 297: if client_ack?(message) || options[:force_client_ack] 298: self.ack(message_id, :transaction => transaction_id) 299: end 300: 301: if retry_count <= options[:max_redeliveries] 302: self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id)) 303: else 304: # Poison ack, sending the message to the DLQ 305: self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true)) 306: end 307: self.commit transaction_id 308: rescue Exception => exception 309: self.abort transaction_id 310: raise exception 311: end 312: end
Unsubscribe from a destination, must specify a name
# File lib/stomp/connection.rb, line 252 252: def unsubscribe(name, headers = {}, subId = nil) 253: headers[:destination] = name 254: transmit("UNSUBSCRIBE", headers) 255: if @reliable 256: subId = name if subId.nil? 257: @subscriptions.delete(subId) 258: end 259: end
# File lib/stomp/connection.rb, line 376 376: def _receive( read_socket ) 377: @read_semaphore.synchronize do 378: line = read_socket.gets 379: 380: return nil if line.nil? 381: 382: # If the reading hangs for more than X seconds, abort the parsing process. 383: # X defaults to 5. Override allowed in connection hash parameters. 384: Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do 385: # Reads the beginning of the message until it runs into a empty line 386: message_header = '' 387: begin 388: message_header += line 389: line = read_socket.gets 390: end until line =~ /^\s?\n$/ 391: 392: # Checks if it includes content_length header 393: content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/ 394: message_body = '' 395: 396: # If it does, reads the specified amount of bytes 397: char = '' 398: if content_length 399: message_body = read_socket.read content_length[1].to_i 400: raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0" 401: # Else reads, the rest of the message until the first \0 402: else 403: message_body += char while (char = parse_char(read_socket.getc)) != "\0" 404: end 405: 406: # If the buffer isn't empty, reads trailing new lines. 407: # Note: experiments with JRuby seem to show that .ready? never 408: # returns true. This means that this code to drain trailing new 409: # lines never runs using JRuby. 410: while read_socket.ready? 411: last_char = read_socket.getc 412: break unless last_char 413: if parse_char(last_char) != "\n" 414: read_socket.ungetc(last_char) 415: break 416: end 417: end 418: # And so, a JRuby hack. Remove any new lines at the start of the 419: # next buffer. 420: message_header.gsub!(/^\n?/, "") 421: 422: # Adds the excluded \n and \0 and tries to create a new message with it 423: Message.new(message_header + "\n" + message_body + "\0") 424: end 425: end 426: end
# File lib/stomp/connection.rb, line 454 454: def _transmit(used_socket, command, headers = {}, body = '') 455: @transmit_semaphore.synchronize do 456: # Handle nil body 457: body = '' if body.nil? 458: # The content-length should be expressed in bytes. 459: # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters 460: # With Unicode strings, # of bytes != # of characters. So, use String#bytesize when available. 461: body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length 462: 463: # ActiveMQ interprets every message as a BinaryMessage 464: # if content_length header is included. 465: # Using :suppress_content_length => true will suppress this behaviour 466: # and ActiveMQ will interpret the message as a TextMessage. 467: # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/ 468: # Lets send this header in the message, so it can maintain state when using unreceive 469: headers['content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length] 470: 471: used_socket.puts command 472: headers.each {|k,v| used_socket.puts "#{k}:#{v}" } 473: used_socket.puts "content-type: text/plain; charset=UTF-8" 474: used_socket.puts 475: used_socket.write body 476: used_socket.write "\0" 477: end 478: end
# File lib/stomp/connection.rb, line 511 511: def close_socketclose_socket 512: begin 513: @socket.close 514: rescue 515: #Ignoring if already closed 516: end 517: 518: @closed = true 519: end
# File lib/stomp/connection.rb, line 532 532: def connect(used_socket) 533: headers = @connect_headers.clone 534: headers[:login] = @login 535: headers[:passcode] = @passcode 536: _transmit(used_socket, "CONNECT", headers) 537: @connection_frame = _receive(used_socket) 538: @disconnect_receipt = nil 539: # replay any subscriptions. 540: @subscriptions.each { |k,v| _transmit(used_socket, "SUBSCRIBE", v) } 541: end
# File lib/stomp/connection.rb, line 543 543: def log_params 544: lparms = @parameters.clone 545: lparms[:cur_host] = @host 546: lparms[:cur_port] = @port 547: lparms[:cur_login] = @login 548: lparms[:cur_passcode] = @passcode 549: lparms[:cur_ssl] = @ssl 550: lparms[:cur_recondelay] = @reconnect_delay 551: lparms[:cur_parseto] = @parse_timeout 552: lparms[:cur_conattempts] = @connection_attempts 553: # 554: lparms 555: end
# File lib/stomp/connection.rb, line 521 521: def open_socket 522: used_socket = @ssl ? open_ssl_socket : open_tcp_socket 523: # try to close the old connection if any 524: close_socket 525: 526: @closed = false 527: # Use keepalive 528: used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) 529: used_socket 530: end
# File lib/stomp/connection.rb, line 486 486: def open_ssl_socket 487: require 'openssl' unless defined?(OpenSSL) 488: ctx = OpenSSL::SSL::SSLContext.new 489: 490: # For client certificate authentication: 491: # key_path = ENV["STOMP_KEY_PATH"] || "~/stomp_keys" 492: # ctx.cert = OpenSSL::X509::Certificate.new("#{key_path}/client.cer") 493: # ctx.key = OpenSSL::PKey::RSA.new("#{key_path}/client.keystore") 494: 495: # For server certificate authentication: 496: # truststores = OpenSSL::X509::Store.new 497: # truststores.add_file("#{key_path}/client.ts") 498: # ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER 499: # ctx.cert_store = truststores 500: 501: ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE 502: 503: ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx) 504: def ssl.ready? 505: ! @rbuffer.empty? || @io.ready? 506: end 507: ssl.connect 508: ssl 509: end
# File lib/stomp/connection.rb, line 480 480: def open_tcp_socket 481: tcp_socket = TCPSocket.open @host, @port 482: 483: tcp_socket 484: end
# File lib/stomp/connection.rb, line 428 428: def parse_char(char) 429: RUBY_VERSION > '1.9' ? char : char.chr 430: end
# File lib/stomp/connection.rb, line 432 432: def transmit(command, headers = {}, body = '') 433: # The transmit may fail so we may need to retry. 434: while TRUE 435: begin 436: used_socket = socket 437: _transmit(used_socket, command, headers, body) 438: return 439: rescue Stomp::Error::MaxReconnectAttempts => e 440: raise 441: rescue 442: @failure = $! 443: raise unless @reliable 444: errstr = "transmit to #{@host} failed: #{$!}\n" 445: if @logger && @logger.respond_to?(:on_miscerr) 446: @logger.on_miscerr(log_params, errstr) 447: else 448: $stderr.print errstr 449: end 450: end 451: end 452: end