Class | RightAws::SqsInterface |
In: |
lib/sqs/right_sqs_interface.rb
|
Parent: | RightAwsBase |
API_VERSION | = | "2007-05-01" |
DEFAULT_HOST | = | "queue.amazonaws.com" |
DEFAULT_PORT | = | 443 |
DEFAULT_PROTOCOL | = | 'https' |
REQUEST_TTL | = | 30 |
DEFAULT_VISIBILITY_TIMEOUT | = | 30 |
Creates a new SqsInterface instance.
sqs = RightAws::SqsInterface.new('1E3GDYEOGFJPIT75KDT40','hgTHt68JY07JKUY08ftHYtERkjgtfERn57DFE379', {:multi_thread => true, :logger => Logger.new('/tmp/x.log')})
Params is a hash:
{:server => 'queue.amazonaws.com' # Amazon service host: 'queue.amazonaws.com'(default) :port => 443 # Amazon service port: 80 or 443(default) :multi_thread => true|false # Multi-threaded (connection per each thread): true or false(default) :signature_version => '0' # The signature version : '0' or '1'(default) :logger => Logger Object} # Logger instance: logs to STDOUT if omitted }
# File lib/sqs/right_sqs_interface.rb, line 62 62: def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, params={}) 63: init({ :name => 'SQS', 64: :default_host => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).host : DEFAULT_HOST, 65: :default_port => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).port : DEFAULT_PORT, 66: :default_protocol => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).scheme : DEFAULT_PROTOCOL }, 67: aws_access_key_id || ENV['AWS_ACCESS_KEY_ID'], 68: aws_secret_access_key || ENV['AWS_SECRET_ACCESS_KEY'], 69: params) 70: end
Returns short queue name by url.
RightSqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'
# File lib/sqs/right_sqs_interface.rb, line 381 381: def self.queue_name_by_url(queue_url) 382: queue_url[/[^\/]*$/] 383: rescue 384: on_exception 385: end
Adds grants for user (identified by email he registered at Amazon). Returns true or an exception. Permission = ‘FULLCONTROL’ | ‘RECEIVEMESSAGE’ | ‘SENDMESSAGE’.
sqs.add_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'my_awesome_friend@gmail.com', 'FULLCONTROL') #=> true
# File lib/sqs/right_sqs_interface.rb, line 239 239: def add_grant(queue_url, grantee_email_address, permission = nil) 240: req_hash = generate_request('AddGrant', 241: 'Grantee.EmailAddress' => grantee_email_address, 242: 'Permission' => permission, 243: :queue_url => queue_url) 244: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 245: rescue 246: on_exception 247: end
Changes message visibility timeout. Returns true or an exception.
sqs.change_message_visibility('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321', 10) #=> true
# File lib/sqs/right_sqs_interface.rb, line 352 352: def change_message_visibility(queue_url, message_id, visibility_timeout=0) 353: req_hash = generate_request('ChangeMessageVisibility', 354: 'MessageId' => message_id, 355: 'VisibilityTimeout' => visibility_timeout.to_s, 356: :queue_url => queue_url) 357: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 358: rescue 359: on_exception 360: end
Removes all visible messages from queue. Return true or an exception.
sqs.clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
# File lib/sqs/right_sqs_interface.rb, line 411 411: def clear_queue(queue_url) 412: while (m = pop_message(queue_url)) ; end # delete all messages in queue 413: true 414: rescue 415: on_exception 416: end
Creates new queue. Returns new queue link.
sqs.create_queue('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'
PS Some queue based requests may not become available until a couple of minutes after queue creation (permission grant and removal for example)
# File lib/sqs/right_sqs_interface.rb, line 144 144: def create_queue(queue_name, default_visibility_timeout=nil) 145: req_hash = generate_request('CreateQueue', 146: 'QueueName' => queue_name, 147: 'DefaultVisibilityTimeout' => default_visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT ) 148: request_info(req_hash, SqsCreateQueueParser.new(:logger => @logger)) 149: end
Deletes message from queue. Returns true or an exception. Amazon returns true on deletion of non-existent messages.
sqs.delete_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '12345678904...0987654321') #=> true
# File lib/sqs/right_sqs_interface.rb, line 339 339: def delete_message(queue_url, message_id) 340: req_hash = generate_request('DeleteMessage', 341: 'MessageId' => message_id, 342: :queue_url => queue_url) 343: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 344: rescue 345: on_exception 346: end
Deletes queue (queue must be empty or force_deletion must be set to true). Queue is identified by url. Returns true or an exception.
sqs.delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2') #=> true
# File lib/sqs/right_sqs_interface.rb, line 168 168: def delete_queue(queue_url, force_deletion = false) 169: req_hash = generate_request('DeleteQueue', 170: 'ForceDeletion' => force_deletion.to_s, 171: :queue_url => queue_url) 172: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 173: rescue 174: on_exception 175: end
Deletes queue then re-creates it (restores attributes also). The fastest method to clear big queue or queue with invisible messages. Return true or an exception.
sqs.force_clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
PS This function is no longer supported. Amazon has changed the SQS semantics to require at least 60 seconds between queue deletion and creation. Hence this method will fail with an exception.
# File lib/sqs/right_sqs_interface.rb, line 425 425: def force_clear_queue(queue_url) 426: queue_name = queue_name_by_url(queue_url) 427: queue_attributes = get_queue_attributes(queue_url) 428: force_delete_queue(queue_url) 429: create_queue(queue_name) 430: # hmmm... The next line is a trick. Amazon do not want change attributes immediately after queue creation 431: # So we do 'empty' get_queue_attributes. Probably they need some time to allow attributes change. 432: get_queue_attributes(queue_url) 433: queue_attributes.each{ |attribute, value| set_queue_attributes(queue_url, attribute, value) } 434: true 435: rescue 436: on_exception 437: end
Deletes queue even if it has messages. Return true or an exception.
force_delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true
P.S. same as delete_queue(‘queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue’, true)
# File lib/sqs/right_sqs_interface.rb, line 444 444: def force_delete_queue(queue_url) 445: delete_queue(queue_url, true) 446: rescue 447: on_exception 448: end
Retrieves the queue attribute(s). Returns a hash of attribute(s) or an exception.
sqs.get_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> {"ApproximateNumberOfMessages"=>"0", "VisibilityTimeout"=>"30"}
# File lib/sqs/right_sqs_interface.rb, line 181 181: def get_queue_attributes(queue_url, attribute='All') 182: req_hash = generate_request('GetQueueAttributes', 183: 'Attribute' => attribute, 184: :queue_url => queue_url) 185: request_info(req_hash, SqsGetQueueAttributesParser.new(:logger => @logger)) 186: rescue 187: on_exception 188: end
Returns approximate number of messages in queue.
sqs.get_queue_length('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 3
# File lib/sqs/right_sqs_interface.rb, line 401 401: def get_queue_length(queue_url) 402: get_queue_attributes(queue_url)['ApproximateNumberOfMessages'].to_i 403: rescue 404: on_exception 405: end
Retrieves visibility timeout.
sqs.get_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 15
See also: get_queue_attributes
# File lib/sqs/right_sqs_interface.rb, line 228 228: def get_visibility_timeout(queue_url) 229: req_hash = generate_request('GetVisibilityTimeout', :queue_url => queue_url ) 230: request_info(req_hash, SqsGetVisibilityTimeoutParser.new(:logger => @logger)) 231: rescue 232: on_exception 233: end
Retrieves hash of grantee_id => perms for this queue:
sqs.list_grants('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> {"000000000000000000000001111111111117476c7fea6efb2c3347ac3ab2792a"=>{:name=>"root", :perms=>["FULLCONTROL"]}, "00000000000000000000000111111111111e5828344600fc9e4a784a09e97041"=>{:name=>"myawesomefriend", :perms=>["FULLCONTROL"]}
# File lib/sqs/right_sqs_interface.rb, line 255 255: def list_grants(queue_url, grantee_email_address=nil, permission = nil) 256: req_hash = generate_request('ListGrants', 257: 'Grantee.EmailAddress' => grantee_email_address, 258: 'Permission' => permission, 259: :queue_url => queue_url) 260: response = request_info(req_hash, SqsListGrantsParser.new(:logger => @logger)) 261: # One user may have up to 3 permission records for every queue. 262: # We will join these records to one. 263: result = {} 264: response.each do |perm| 265: id = perm[:id] 266: # create hash for new user if unexisit 267: result[id] = {:perms=>[]} unless result[id] 268: # fill current grantee params 269: result[id][:perms] << perm[:permission] 270: result[id][:name] = perm[:name] 271: end 272: result 273: end
Lists all queues owned by this user that have names beginning with queue_name_prefix. If queue_name_prefix is omitted then retrieves a list of all queues.
sqs.create_queue('my_awesome_queue') sqs.create_queue('my_awesome_queue_2') sqs.list_queues('my_awesome') #=> ['http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue','http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2']
# File lib/sqs/right_sqs_interface.rb, line 157 157: def list_queues(queue_name_prefix=nil) 158: req_hash = generate_request('ListQueues', 'QueueNamePrefix' => queue_name_prefix) 159: request_info(req_hash, SqsListQueuesParser.new(:logger => @logger)) 160: rescue 161: on_exception 162: end
Peeks message from queue by message id. Returns message in format of {:id=>’message_id’, :body=>’message_body’} or nil.
sqs.peek_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321') #=> {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
# File lib/sqs/right_sqs_interface.rb, line 313 313: def peek_message(queue_url, message_id) 314: req_hash = generate_rest_request('GET', :queue_url => "#{queue_url}/#{CGI::escape message_id}" ) 315: messages = request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger)) 316: messages.blank? ? nil : messages[0] 317: rescue 318: on_exception 319: end
Pops (retrieves and deletes) first accessible message from queue. Returns the message in format {:id=>’message_id’, :body=>’message_body’} or nil.
sqs.pop_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
# File lib/sqs/right_sqs_interface.rb, line 485 485: def pop_message(queue_url) 486: messages = pop_messages(queue_url) 487: messages.blank? ? nil : messages[0] 488: rescue 489: on_exception 490: end
Pops (retrieves and deletes) up to ‘number_of_messages’ from queue. Returns an array of retrieved messages in format: [{:id=>’message_id’, :body=>’message_body’}].
sqs.pop_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 3) #=> [{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]
# File lib/sqs/right_sqs_interface.rb, line 470 470: def pop_messages(queue_url, number_of_messages=1) 471: messages = receive_messages(queue_url, number_of_messages) 472: messages.each do |message| 473: delete_message(queue_url, message[:id]) 474: end 475: messages 476: rescue 477: on_exception 478: end
Returns short queue name by url.
sqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'
# File lib/sqs/right_sqs_interface.rb, line 391 391: def queue_name_by_url(queue_url) 392: self.class.queue_name_by_url(queue_url) 393: rescue 394: on_exception 395: end
Returns queue url by queue short name or nil if queue is not found
sqs.queue_url_by_name('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'
# File lib/sqs/right_sqs_interface.rb, line 366 366: def queue_url_by_name(queue_name) 367: return queue_name if queue_name.include?('/') 368: queue_urls = list_queues(queue_name) 369: queue_urls.each do |queue_url| 370: return queue_url if queue_name_by_url(queue_url) == queue_name 371: end 372: nil 373: rescue 374: on_exception 375: end
Reads first accessible message from queue. Returns message as a hash: {:id=>’message_id’, :body=>’message_body’} or nil.
sqs.receive_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 10) #=> {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}
# File lib/sqs/right_sqs_interface.rb, line 455 455: def receive_message(queue_url, visibility_timeout=nil) 456: result = receive_messages(queue_url, 1, visibility_timeout) 457: result.blank? ? nil : result[0] 458: rescue 459: on_exception 460: end
Retrieves a list of messages from queue. Returns an array of hashes in format: {:id=>’message_id’, body=>’message_body’}
sqs.receive_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue',10, 5) #=> [{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]
P.S. Usually returns fewer messages than requested even if they are available.
# File lib/sqs/right_sqs_interface.rb, line 297 297: def receive_messages(queue_url, number_of_messages=1, visibility_timeout=nil) 298: return [] if number_of_messages == 0 299: req_hash = generate_rest_request('GET', 300: 'NumberOfMessages' => number_of_messages, 301: 'VisibilityTimeout' => visibility_timeout, 302: :queue_url => "#{queue_url}/front" ) 303: request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger)) 304: rescue 305: on_exception 306: end
Revokes permission from user. Returns true or an exception.
sqs.remove_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'my_awesome_friend@gmail.com', 'FULLCONTROL') #=> true
# File lib/sqs/right_sqs_interface.rb, line 279 279: def remove_grant(queue_url, grantee_email_address_or_id, permission = nil) 280: grantee_key = grantee_email_address_or_id.include?('@') ? 'Grantee.EmailAddress' : 'Grantee.ID' 281: req_hash = generate_request('RemoveGrant', 282: grantee_key => grantee_email_address_or_id, 283: 'Permission' => permission, 284: :queue_url => queue_url) 285: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 286: rescue 287: on_exception 288: end
Sends new message to queue.Returns ‘message_id’ or raises an exception.
sqs.send_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'message_1') #=> "1234567890...0987654321"
# File lib/sqs/right_sqs_interface.rb, line 325 325: def send_message(queue_url, message) 326: req_hash = generate_rest_request('PUT', 327: :message => message, 328: :queue_url => "#{queue_url}/back") 329: request_info(req_hash, SqsSendMessagesParser.new(:logger => @logger)) 330: rescue 331: on_exception 332: end
Sets queue attribute. Returns true or an exception.
sqs.set_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', "VisibilityTimeout", 10) #=> true
P.S. Amazon returns success even if the attribute does not exist. Also, attribute values may not be immediately available to other queries for some time after an update (see the SQS documentation for semantics).
# File lib/sqs/right_sqs_interface.rb, line 197 197: def set_queue_attributes(queue_url, attribute, value) 198: req_hash = generate_request('SetQueueAttributes', 199: 'Attribute' => attribute, 200: 'Value' => value, 201: :queue_url => queue_url) 202: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 203: rescue 204: on_exception 205: end
Sets visibility timeout. Returns true or an exception.
sqs.set_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 15) #=> true
See also: set_queue_attributes
# File lib/sqs/right_sqs_interface.rb, line 213 213: def set_visibility_timeout(queue_url, visibility_timeout=nil) 214: req_hash = generate_request('SetVisibilityTimeout', 215: 'VisibilityTimeout' => visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT, 216: :queue_url => queue_url ) 217: request_info(req_hash, SqsStatusParser.new(:logger => @logger)) 218: rescue 219: on_exception 220: end