Class Stomp::Connection
In: lib/stomp/connection.rb
Parent: Object
RuntimeError InvalidFormat InvalidMessageLength PacketParsingTimeout MaxReconnectAttempts InvalidServerCommand Client Connection Message lib/stomp/client.rb lib/stomp/connection.rb lib/stomp/message.rb Version lib/stomp/errors.rb Error Stomp dot/m_6_0.png

Low level connection which maps commands and supports synchronous receives

Methods

Attributes

connection_frame  [R] 
disconnect_receipt  [R] 

Public Class methods

alias :obj_send :send

[Source]

    # File lib/stomp/connection.rb, line 14
14:     def self.default_port(ssl)
15:       ssl ? 61612 : 61613
16:     end

A new Connection object accepts the following parameters:

  login             (String,  default : '')
  passcode          (String,  default : '')
  host              (String,  default : 'localhost')
  port              (Integer, default : 61613)
  reliable          (Boolean, default : false)
  reconnect_delay   (Integer, default : 5)

  e.g. c = Connection.new("username", "password", "localhost", 61613, true)

Hash:

  hash = {
    :hosts => [
      {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
      {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
    ],
    :initial_reconnect_delay => 0.01,
    :max_reconnect_delay => 30.0,
    :use_exponential_back_off => true,
    :back_off_multiplier => 2,
    :max_reconnect_attempts => 0,
    :randomize => false,
    :backup => false,
    :timeout => -1,
    :connect_headers => {},
    :parse_timeout => 5,
    :logger => nil,
  }

  e.g. c = Connection.new(hash)

TODO Stomp URL :

  A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

  stomp://host:port
  stomp://host.domain.tld:port
  stomp://user:pass@host:port
  stomp://user:pass@host.domain.tld:port

[Source]

    # File lib/stomp/connection.rb, line 60
60:     def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
61:       @received_messages = []
62: 
63:       if login.is_a?(Hash)
64:         hashed_initialize(login)
65:       else
66:         @host = host
67:         @port = port
68:         @login = login
69:         @passcode = passcode
70:         @reliable = reliable
71:         @reconnect_delay = reconnect_delay
72:         @connect_headers = connect_headers
73:         @ssl = false
74:         @parameters = nil
75:         @parse_timeout = 5              # To override, use hashed parameters
76:         @logger = nil                   # To override, use hashed parameters
77:       end
78:       
79:       # Use Mutexes:  only one lock per each thread
80:       # Revert to original implementation attempt
81:       @transmit_semaphore = Mutex.new
82:       @read_semaphore = Mutex.new
83:       @socket_semaphore = Mutex.new
84:       
85:       @subscriptions = {}
86:       @failure = nil
87:       @connection_attempts = 0
88:       
89:       socket
90:     end

Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.

[Source]

     # File lib/stomp/connection.rb, line 108
108:     def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
109:       Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
110:     end

Public Instance methods

Receive a frame, block until the frame is received

[Source]

     # File lib/stomp/connection.rb, line 340
340:     def __old_receive
341:       # The recive my fail so we may need to retry.
342:       while TRUE
343:         begin
344:           used_socket = socket
345:           return _receive(used_socket)
346:         rescue
347:           @failure = $!
348:           raise unless @reliable
349:           errstr = "receive failed: #{$!}"
350:           if @logger && @logger.respond_to?(:on_miscerr)
351:             @logger.on_miscerr(log_params, errstr)
352:           else
353:             $stderr.print errstr
354:           end
355:         end
356:       end
357:     end

Abort a transaction by name

[Source]

     # File lib/stomp/connection.rb, line 234
234:     def abort(name, headers = {})
235:       headers[:transaction] = name
236:       transmit("ABORT", headers)
237:     end

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/connection.rb, line 222
222:     def ack(message_id, headers = {})
223:       headers['message-id'] = message_id
224:       transmit("ACK", headers)
225:     end

Begin a transaction, requires a name for the transaction

[Source]

     # File lib/stomp/connection.rb, line 213
213:     def begin(name, headers = {})
214:       headers[:transaction] = name
215:       transmit("BEGIN", headers)
216:     end

[Source]

     # File lib/stomp/connection.rb, line 175
175:     def change_host
176:       @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
177:       
178:       # Set first as master and send it to the end of array
179:       current_host = @parameters[:hosts].shift
180:       @parameters[:hosts] << current_host
181:       
182:       @ssl = current_host[:ssl]
183:       @host = current_host[:host]
184:       @port = current_host[:port] || Connection::default_port(@ssl)
185:       @login = current_host[:login] || ""
186:       @passcode = current_host[:passcode] || ""
187:       
188:     end

[Source]

     # File lib/stomp/connection.rb, line 314
314:     def client_ack?(message)
315:       headers = @subscriptions[message.headers[:destination]]
316:       !headers.nil? && headers[:ack] == "client"
317:     end

Is this connection closed?

[Source]

     # File lib/stomp/connection.rb, line 208
208:     def closed?
209:       @closed
210:     end

Commit a transaction by name

[Source]

     # File lib/stomp/connection.rb, line 228
228:     def commit(name, headers = {})
229:       headers[:transaction] = name
230:       transmit("COMMIT", headers)
231:     end

Close this connection

[Source]

     # File lib/stomp/connection.rb, line 320
320:     def disconnect(headers = {})
321:       transmit("DISCONNECT", headers)
322:       headers = headers.symbolize_keys
323:       @disconnect_receipt = receive if headers[:receipt]
324:       if @logger && @logger.respond_to?(:on_disconnect)
325:         @logger.on_disconnect(log_params)
326:       end
327:       close_socket
328:     end

[Source]

     # File lib/stomp/connection.rb, line 92
 92:     def hashed_initialize(params)
 93:       
 94:       @parameters = refine_params(params)
 95:       @reliable = true
 96:       @reconnect_delay = @parameters[:initial_reconnect_delay]
 97:       @connect_headers = @parameters[:connect_headers]
 98:       @parse_timeout =  @parameters[:parse_timeout]
 99:       @logger =  @parameters[:logger]
100:       #sets the first host to connect
101:       change_host
102:       if @logger && @logger.respond_to?(:on_connecting)            
103:         @logger.on_connecting(log_params)
104:       end
105:     end

[Source]

     # File lib/stomp/connection.rb, line 194
194:     def increase_reconnect_delay
195: 
196:       @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] 
197:       @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
198:       
199:       @reconnect_delay
200:     end

[Source]

     # File lib/stomp/connection.rb, line 190
190:     def max_reconnect_attempts?
191:       !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
192:     end

[Source]

     # File lib/stomp/connection.rb, line 270
270:     def obj_send(*args)
271:       __send__(*args)
272:     end

Is this connection open?

[Source]

     # File lib/stomp/connection.rb, line 203
203:     def open?
204:       !@closed
205:     end

Return a pending message if one is available, otherwise return nil

[Source]

     # File lib/stomp/connection.rb, line 332
332:     def poll
333:       # No need for a read lock here.  The receive method eventually fullfills
334:       # that requirement.
335:       return nil if @socket.nil? || !@socket.ready?
336:       receive
337:     end

Publish message to destination

To disable content length header ( :suppress_content_length => true ) Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/connection.rb, line 265
265:     def publish(destination, message, headers = {})
266:       headers[:destination] = destination
267:       transmit("SEND", headers, message)
268:     end

[Source]

     # File lib/stomp/connection.rb, line 359
359:     def receive
360:       super_result = __old_receive
361:       if super_result.nil? && @reliable
362:         errstr = "connection.receive returning EOF as nil - resetting connection.\n"
363:         if @logger && @logger.respond_to?(:on_miscerr)
364:           @logger.on_miscerr(log_params, errstr)
365:         else
366:           $stderr.print errstr
367:         end
368:         @socket = nil
369:         super_result = __old_receive
370:       end
371:       return super_result
372:     end

[Source]

     # File lib/stomp/connection.rb, line 153
153:     def refine_params(params)
154:       params = params.uncamelize_and_symbolize_keys
155:       
156:       default_params = {
157:         :connect_headers => {},
158:         # Failover parameters
159:         :initial_reconnect_delay => 0.01,
160:         :max_reconnect_delay => 30.0,
161:         :use_exponential_back_off => true,
162:         :back_off_multiplier => 2,
163:         :max_reconnect_attempts => 0,
164:         :randomize => false,
165:         :backup => false,
166:         :timeout => -1,
167:         # Parse Timeout
168:         :parse_timeout => 5
169:       }
170:       
171:       default_params.merge(params)
172:         
173:     end

[Source]

     # File lib/stomp/connection.rb, line 274
274:     def send(*args)
275:       warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
276:       publish(*args)
277:     end

[Source]

     # File lib/stomp/connection.rb, line 112
112:     def socket
113:       @socket_semaphore.synchronize do
114:         used_socket = @socket
115:         used_socket = nil if closed?
116:         
117:         while used_socket.nil? || !@failure.nil?
118:           @failure = nil
119:           begin
120:             used_socket = open_socket
121:             # Open complete
122:             
123:             connect(used_socket)
124:             if @logger && @logger.respond_to?(:on_connected)
125:               @logger.on_connected(log_params) 
126:             end
127:             @connection_attempts = 0
128:           rescue
129:             @failure = $!
130:             used_socket = nil
131:             raise unless @reliable
132:             if @logger && @logger.respond_to?(:on_connectfail)            
133:               @logger.on_connectfail(log_params) 
134:             else
135:               $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
136:             end
137:             raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?
138: 
139:             sleep(@reconnect_delay)
140:             
141:             @connection_attempts += 1
142:             
143:             if @parameters
144:               change_host
145:               increase_reconnect_delay
146:             end
147:           end
148:         end
149:         @socket = used_socket
150:       end
151:     end

Subscribe to a destination, must specify a name

[Source]

     # File lib/stomp/connection.rb, line 240
240:     def subscribe(name, headers = {}, subId = nil)
241:       headers[:destination] = name
242:       transmit("SUBSCRIBE", headers)
243: 
244:       # Store the sub so that we can replay if we reconnect.
245:       if @reliable
246:         subId = name if subId.nil?
247:         @subscriptions[subId] = headers
248:       end
249:     end

Send a message back to the source or to the dead letter queue

Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" ) Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ) Accepts a force client acknowledgement option (:force_client_ack => true)

[Source]

     # File lib/stomp/connection.rb, line 284
284:     def unreceive(message, options = {})
285:       options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options
286:       # Lets make sure all keys are symbols
287:       message.headers = message.headers.symbolize_keys
288:       
289:       retry_count = message.headers[:retry_count].to_i || 0
290:       message.headers[:retry_count] = retry_count + 1
291:       transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
292:       message_id = message.headers.delete('message-id''message-id')
293:       
294:       begin
295:         self.begin transaction_id
296:         
297:         if client_ack?(message) || options[:force_client_ack]
298:           self.ack(message_id, :transaction => transaction_id)
299:         end
300:         
301:         if retry_count <= options[:max_redeliveries]
302:           self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id))
303:         else
304:           # Poison ack, sending the message to the DLQ
305:           self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true))
306:         end
307:         self.commit transaction_id
308:       rescue Exception => exception
309:         self.abort transaction_id
310:         raise exception
311:       end
312:     end

Unsubscribe from a destination, must specify a name

[Source]

     # File lib/stomp/connection.rb, line 252
252:     def unsubscribe(name, headers = {}, subId = nil)
253:       headers[:destination] = name
254:       transmit("UNSUBSCRIBE", headers)
255:       if @reliable
256:         subId = name if subId.nil?
257:         @subscriptions.delete(subId)
258:       end
259:     end

Private Instance methods

[Source]

     # File lib/stomp/connection.rb, line 376
376:       def _receive( read_socket )
377:         @read_semaphore.synchronize do
378:           line = read_socket.gets
379: 
380:           return nil if line.nil?
381: 
382:           # If the reading hangs for more than X seconds, abort the parsing process.
383:           # X defaults to 5.  Override allowed in connection hash parameters.
384:           Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do
385:             # Reads the beginning of the message until it runs into a empty line
386:             message_header = ''
387:             begin
388:               message_header += line
389:               line = read_socket.gets
390:             end until line =~ /^\s?\n$/
391: 
392:             # Checks if it includes content_length header
393:             content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
394:             message_body = ''
395: 
396:             # If it does, reads the specified amount of bytes
397:             char = ''
398:             if content_length
399:               message_body = read_socket.read content_length[1].to_i
400:               raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
401:             # Else reads, the rest of the message until the first \0
402:             else
403:               message_body += char while (char = parse_char(read_socket.getc)) != "\0"
404:             end
405: 
406:             # If the buffer isn't empty, reads trailing new lines.
407:             # Note: experiments with JRuby seem to show that .ready? never
408:             # returns true.  This means that this code to drain trailing new
409:             # lines never runs using JRuby.
410:             while read_socket.ready?
411:               last_char = read_socket.getc
412:               break unless last_char
413:               if parse_char(last_char) != "\n"
414:                 read_socket.ungetc(last_char)
415:                 break
416:               end
417:             end
418:             # And so, a JRuby hack.  Remove any new lines at the start of the 
419:             # next buffer.
420:             message_header.gsub!(/^\n?/, "")
421: 
422:             # Adds the excluded \n and \0 and tries to create a new message with it
423:             Message.new(message_header + "\n" + message_body + "\0")
424:           end
425:         end
426:       end

[Source]

     # File lib/stomp/connection.rb, line 454
454:       def _transmit(used_socket, command, headers = {}, body = '')
455:         @transmit_semaphore.synchronize do
456:           # Handle nil body
457:           body = '' if body.nil?
458:           # The content-length should be expressed in bytes.
459:           # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
460:           # With Unicode strings, # of bytes != # of characters.  So, use String#bytesize when available.
461:           body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length
462:  
463:           # ActiveMQ interprets every message as a BinaryMessage 
464:           # if content_length header is included. 
465:           # Using :suppress_content_length => true will suppress this behaviour
466:           # and ActiveMQ will interpret the message as a TextMessage.
467:           # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
468:           # Lets send this header in the message, so it can maintain state when using unreceive
469:           headers['content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
470:           
471:           used_socket.puts command  
472:           headers.each {|k,v| used_socket.puts "#{k}:#{v}" }
473:           used_socket.puts "content-type: text/plain; charset=UTF-8"
474:           used_socket.puts
475:           used_socket.write body
476:           used_socket.write "\0"
477:         end
478:       end

[Source]

     # File lib/stomp/connection.rb, line 511
511:       def  close_socketclose_socket
512:         begin
513:           @socket.close
514:         rescue
515:           #Ignoring if already closed
516:         end
517: 
518:         @closed = true
519:       end

[Source]

     # File lib/stomp/connection.rb, line 532
532:       def connect(used_socket)
533:         headers = @connect_headers.clone
534:         headers[:login] = @login
535:         headers[:passcode] = @passcode
536:         _transmit(used_socket, "CONNECT", headers)
537:         @connection_frame = _receive(used_socket)
538:         @disconnect_receipt = nil
539:         # replay any subscriptions.
540:         @subscriptions.each { |k,v| _transmit(used_socket, "SUBSCRIBE", v) }
541:       end

[Source]

     # File lib/stomp/connection.rb, line 543
543:       def log_params
544:         lparms = @parameters.clone
545:         lparms[:cur_host] = @host
546:         lparms[:cur_port] = @port
547:         lparms[:cur_login] = @login
548:         lparms[:cur_passcode] = @passcode
549:         lparms[:cur_ssl] = @ssl
550:         lparms[:cur_recondelay] = @reconnect_delay
551:         lparms[:cur_parseto] = @parse_timeout
552:         lparms[:cur_conattempts] = @connection_attempts
553:         #
554:         lparms
555:       end

[Source]

     # File lib/stomp/connection.rb, line 521
521:       def open_socket
522:         used_socket = @ssl ? open_ssl_socket : open_tcp_socket
523:         # try to close the old connection if any
524:         close_socket
525:         
526:         @closed = false
527:         # Use keepalive
528:         used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
529:         used_socket
530:       end

[Source]

     # File lib/stomp/connection.rb, line 486
486:       def open_ssl_socket
487:         require 'openssl' unless defined?(OpenSSL)
488:         ctx = OpenSSL::SSL::SSLContext.new
489: 
490:         # For client certificate authentication:
491:         # key_path = ENV["STOMP_KEY_PATH"] || "~/stomp_keys"
492:         # ctx.cert = OpenSSL::X509::Certificate.new("#{key_path}/client.cer")
493:         # ctx.key = OpenSSL::PKey::RSA.new("#{key_path}/client.keystore")
494: 
495:         # For server certificate authentication:
496:         # truststores = OpenSSL::X509::Store.new
497:         # truststores.add_file("#{key_path}/client.ts")
498:         # ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
499:         # ctx.cert_store = truststores
500: 
501:         ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE  
502: 
503:         ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx)
504:         def ssl.ready?
505:           ! @rbuffer.empty? || @io.ready?
506:         end
507:         ssl.connect
508:         ssl
509:       end

[Source]

     # File lib/stomp/connection.rb, line 480
480:       def open_tcp_socket
481:         tcp_socket = TCPSocket.open @host, @port
482: 
483:         tcp_socket
484:       end

[Source]

     # File lib/stomp/connection.rb, line 428
428:       def parse_char(char)
429:         RUBY_VERSION > '1.9' ? char : char.chr
430:       end

[Source]

     # File lib/stomp/connection.rb, line 432
432:       def transmit(command, headers = {}, body = '')
433:         # The transmit may fail so we may need to retry.
434:         while TRUE
435:           begin
436:             used_socket = socket
437:             _transmit(used_socket, command, headers, body)
438:             return
439:           rescue Stomp::Error::MaxReconnectAttempts => e
440:               raise
441:           rescue
442:             @failure = $!
443:             raise unless @reliable
444:             errstr = "transmit to #{@host} failed: #{$!}\n"
445:             if @logger && @logger.respond_to?(:on_miscerr)
446:               @logger.on_miscerr(log_params, errstr)
447:             else
448:               $stderr.print errstr
449:             end
450:           end
451:         end
452:       end

[Validate]