Class Stomp::Client
In: lib/stomp/client.rb
Parent: Object
RuntimeError InvalidFormat InvalidMessageLength PacketParsingTimeout MaxReconnectAttempts InvalidServerCommand Client Connection Message lib/stomp/client.rb lib/stomp/connection.rb lib/stomp/message.rb Version lib/stomp/errors.rb Error Stomp dot/m_6_0.png

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Methods

Attributes

host  [R] 
login  [R] 
parameters  [R] 
passcode  [R] 
port  [R] 
reliable  [R] 

Public Class methods

A new Client object can be initialized using two forms:

Standard positional parameters:

  login     (String,  default : '')
  passcode  (String,  default : '')
  host      (String,  default : 'localhost')
  port      (Integer, default : 61613)
  reliable  (Boolean, default : false)

  e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

  A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

  stomp://host:port
  stomp://host.domain.tld:port
  stomp://login:passcode@host:port
  stomp://login:passcode@host.domain.tld:port

[Source]

    # File lib/stomp/client.rb, line 36
36:     def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
37: 
38:       # Parse stomp:// URL's or set params
39:       if login.is_a?(Hash)
40:         @parameters = login
41:         
42:         first_host = @parameters[:hosts][0]
43:         
44:         @login = first_host[:login]
45:         @passcode = first_host[:passcode]
46:         @host = first_host[:host]
47:         @port = first_host[:port] || Connection::default_port(first_host[:ssl])
48:         
49:         @reliable = true
50:         
51:       elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port
52:         @login = $2 || ""
53:         @passcode = $3 || ""
54:         @host = $4
55:         @port = $5.to_i
56:         @reliable = false
57:       elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param
58: 
59:         first_host = {}
60:         first_host[:ssl] = !$2.nil?
61:         @login = first_host[:login] = $4 || ""
62:         @passcode = first_host[:passcode] = $5 || ""
63:         @host = first_host[:host] = $6
64:         @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl])
65:         
66:         options = $16 || ""
67:         parts = options.split(/&|=/)
68:         options = Hash[*parts]
69:         
70:         hosts = [first_host] + parse_hosts(login)
71:         
72:         @parameters = {}
73:         @parameters[:hosts] = hosts
74:         
75:         @parameters.merge! filter_options(options)
76:                 
77:         @reliable = true
78:       else
79:         @login = login
80:         @passcode = passcode
81:         @host = host
82:         @port = port.to_i
83:         @reliable = reliable
84:       end
85: 
86:       check_arguments!
87: 
88:       @id_mutex = Mutex.new
89:       @ids = 1
90: 
91:       if @parameters
92:         @connection = Connection.new(@parameters)
93:       else
94:         @connection = Connection.new(@login, @passcode, @host, @port, @reliable)
95:       end
96:       
97:       start_listeners
98: 
99:     end

Syntactic sugar for ‘Client.new’ See ‘initialize’ for usage.

[Source]

     # File lib/stomp/client.rb, line 102
102:     def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
103:       Client.new(login, passcode, host, port, reliable)
104:     end

Public Instance methods

Abort a transaction by name

[Source]

     # File lib/stomp/client.rb, line 118
118:     def abort(name, headers = {})
119:       @connection.abort(name, headers)
120: 
121:       # lets replay any ack'd messages in this transaction
122:       replay_list = @replay_messages_by_txn[name]
123:       if replay_list
124:         replay_list.each do |message|
125:           if listener = find_listener(message)
126:             listener.call(message)
127:           end
128:         end
129:       end
130:     end

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/client.rb, line 167
167:     def acknowledge(message, headers = {})
168:       txn_id = headers[:transaction]
169:       if txn_id
170:         # lets keep around messages ack'd in this transaction in case we rollback
171:         replay_list = @replay_messages_by_txn[txn_id]
172:         if replay_list.nil?
173:           replay_list = []
174:           @replay_messages_by_txn[txn_id] = replay_list
175:         end
176:         replay_list << message
177:       end
178:       if block_given?
179:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
180:       end
181:       @connection.ack message.headers['message-id'], headers
182:     end

Begin a transaction by name

[Source]

     # File lib/stomp/client.rb, line 113
113:     def begin(name, headers = {})
114:       @connection.begin(name, headers)
115:     end

Close out resources in use by this client

[Source]

     # File lib/stomp/client.rb, line 231
231:     def close headers={}
232:       @listener_thread.exit
233:       @connection.disconnect headers
234:     end

Is this client closed?

[Source]

     # File lib/stomp/client.rb, line 226
226:     def closed?
227:       @connection.closed?
228:     end

Commit a transaction by name

[Source]

     # File lib/stomp/client.rb, line 133
133:     def commit(name, headers = {})
134:       txn_id = headers[:transaction]
135:       @replay_messages_by_txn.delete(txn_id)
136:       @connection.commit(name, headers)
137:     end

[Source]

     # File lib/stomp/client.rb, line 212
212:     def connection_frame
213:       @connection.connection_frame
214:     end

[Source]

     # File lib/stomp/client.rb, line 216
216:     def disconnect_receipt
217:       @connection.disconnect_receipt
218:     end

Join the listener thread for this client, generally used to wait for a quit signal

[Source]

     # File lib/stomp/client.rb, line 108
108:     def join(limit = nil)
109:       @listener_thread.join(limit)
110:     end

[Source]

     # File lib/stomp/client.rb, line 203
203:     def obj_send(*args)
204:       __send__(*args)
205:     end

Is this client open?

[Source]

     # File lib/stomp/client.rb, line 221
221:     def open?
222:       @connection.open?
223:     end

Publishes message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/client.rb, line 196
196:     def publish(destination, message, headers = {})
197:       if block_given?
198:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
199:       end
200:       @connection.publish(destination, message, headers)
201:     end

Check if the thread was created and isn‘t dead

[Source]

     # File lib/stomp/client.rb, line 237
237:     def running
238:       @listener_thread && !!@listener_thread.status
239:     end

[Source]

     # File lib/stomp/client.rb, line 207
207:     def send(*args)
208:       warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
209:       publish(*args)
210:     end

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/client.rb, line 143
143:     def subscribe(destination, headers = {})
144:       raise "No listener given" unless block_given?
145:       # use subscription id to correlate messages to subscription. As described in
146:       # the SUBSCRIPTION section of the protocol: http://stomp.codehaus.org/Protocol.
147:       # If no subscription id is provided, generate one.
148:       set_subscription_id_if_missing(destination, headers)
149:       if @listeners[headers[:id]]
150:         raise "attempting to subscribe to a queue with a previous subscription"
151:       end
152:       @listeners[headers[:id]] = lambda {|msg| yield msg}
153:       @connection.subscribe(destination, headers)
154:     end

Unreceive a message, sending it back to its queue or to the DLQ

[Source]

     # File lib/stomp/client.rb, line 186
186:     def unreceive(message, options = {})
187:       @connection.unreceive(message, options)
188:     end

Unsubecribe from a channel

[Source]

     # File lib/stomp/client.rb, line 157
157:     def unsubscribe(name, headers = {})
158:       set_subscription_id_if_missing(name, headers)
159:       @connection.unsubscribe(name, headers)
160:       @listeners[headers[:id]] = nil
161:     end

Private Instance methods

[Source]

     # File lib/stomp/client.rb, line 286
286:       def check_arguments!
287:         raise ArgumentError if @host.nil? || @host.empty?
288:         raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535
289:         raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass)
290:       end

[Source]

     # File lib/stomp/client.rb, line 292
292:       def filter_options(options)
293:         new_options = {}
294:         new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms
295:         new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms
296:         new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true
297:         new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i
298:         new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i
299:         new_options[:randomize] = options["randomize"] == "true" # Default: false
300:         new_options[:backup] = false # Not implemented yet: I'm using a master X slave solution
301:         new_options[:timeout] = -1 # Not implemented yet: a "timeout(5) do ... end" would do the trick, feel free
302:         
303:         new_options
304:       end

[Source]

     # File lib/stomp/client.rb, line 306
306:       def find_listener(message)
307:         subscription_id = message.headers['subscription']
308:         if subscription_id == nil
309:           # For backward compatibility, some messages may already exist with no
310:           # subscription id, in which case we can attempt to synthesize one.
311:           set_subscription_id_if_missing(message.headers['destination'], message.headers)
312:           subscription_id = message.headers['id']
313:         end
314:         @listeners[subscription_id]
315:       end

[Source]

     # File lib/stomp/client.rb, line 268
268:       def parse_hosts(url)
269:         hosts = []
270:         
271:         host_match = /stomp(\+ssl)?:\/\/(([\w\.]*):(\w*)@)?([\w\.]+):(\d+)\)/
272:         url.scan(host_match).each do |match|
273:           host = {}
274:           host[:ssl] = !match[0].nil?
275:           host[:login] =  match[2] || ""
276:           host[:passcode] = match[3] || ""
277:           host[:host] = match[4]
278:           host[:port] = match[5].to_i
279:           
280:           hosts << host
281:         end
282:         
283:         hosts
284:       end

[Source]

     # File lib/stomp/client.rb, line 253
253:       def register_receipt_listener(listener)
254:         id = -1
255:         @id_mutex.synchronize do
256:           id = @ids.to_s
257:           @ids = @ids.succ
258:         end
259:         @receipt_listeners[id] = listener
260:         id
261:       end

Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.codehaus.org/Protocol

[Source]

     # File lib/stomp/client.rb, line 246
246:       def set_subscription_id_if_missing(destination, headers)
247:         headers[:id] = headers[:id] ? headers[:id] : headers['id'] 
248:         if headers[:id] == nil
249:           headers[:id] = Digest::SHA1.hexdigest(destination)
250:         end
251:       end

[Source]

     # File lib/stomp/client.rb, line 317
317:       def start_listeners
318:         @listeners = {}
319:         @receipt_listeners = {}
320:         @replay_messages_by_txn = {}
321: 
322:         @listener_thread = Thread.start do
323:           while true
324:             message = @connection.receive
325:             if message.command == 'MESSAGE'
326:               if listener = find_listener(message)
327:                 listener.call(message)
328:               end
329:             elsif message.command == 'RECEIPT'
330:               if listener = @receipt_listeners[message.headers['receipt-id']]
331:                 listener.call(message)
332:               end
333:             end
334:           end
335:         end
336:         
337:       end

e.g. login:passcode@host:port or host:port

[Source]

     # File lib/stomp/client.rb, line 264
264:       def url_regex
265:         '(([\w\.\-]*):(\w*)@)?([\w\.\-]+):(\d+)'
266:       end

[Validate]