00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <config.h>
00026
00027 #include <drizzled/gettext.h>
00028
00029 #include "rabbitmq_handler.h"
00030
00031 using namespace std;
00032
00033 namespace drizzle_plugin
00034 {
00035
00036 RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost,
00037 const in_port_t rabbitMQPort,
00038 const std::string &rabbitMQUsername,
00039 const std::string &rabbitMQPassword,
00040 const std::string &rabbitMQVirtualhost)
00041 throw(rabbitmq_handler_exception) :
00042 rabbitmqConnection(amqp_new_connection()),
00043 sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
00044 hostname(rabbitMQHost),
00045 port(rabbitMQPort),
00046 username(rabbitMQUsername),
00047 password(rabbitMQPassword),
00048 virtualhost(rabbitMQVirtualhost)
00049 {
00050
00051 if(sockfd < 0)
00052 {
00053 throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
00054 }
00055 amqp_set_sockfd(rabbitmqConnection, sockfd);
00056
00057 handleAMQPError(amqp_login(rabbitmqConnection,
00058 virtualhost.c_str(),
00059 0,
00060 131072,
00061 0,
00062 AMQP_SASL_METHOD_PLAIN,
00063 username.c_str(),
00064 password.c_str()),
00065 "rabbitmq login");
00066
00067 amqp_channel_open(rabbitmqConnection, 1);
00068 }
00069
00070 RabbitMQHandler::~RabbitMQHandler()
00071 {
00072 try
00073 {
00074 handleAMQPError(amqp_channel_close(rabbitmqConnection,
00075 1,
00076 AMQP_REPLY_SUCCESS),
00077 "close channel");
00078 handleAMQPError(amqp_connection_close(rabbitmqConnection,
00079 AMQP_REPLY_SUCCESS),
00080 "close connection");
00081 amqp_destroy_connection(rabbitmqConnection);
00082 }
00083 catch(exception& e) {}
00084
00085 close(sockfd);
00086 }
00087
00088 void RabbitMQHandler::publish(void *message,
00089 const int length,
00090 const std::string &exchangeName,
00091 const std::string &routingKey)
00092 throw(rabbitmq_handler_exception)
00093 {
00094 amqp_bytes_t b;
00095 b.bytes= message;
00096 b.len= length;
00097
00098 if (amqp_basic_publish(rabbitmqConnection,
00099 1,
00100 amqp_cstring_bytes(exchangeName.c_str()),
00101 amqp_cstring_bytes(routingKey.c_str()),
00102 0,
00103 0,
00104 NULL,
00105 b) < 0)
00106 {
00107 throw rabbitmq_handler_exception("Could not publish message");
00108 }
00109
00110 }
00111
00112 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
00113 {
00114 string errorMessage("");
00115 switch (x.reply_type) {
00116 case AMQP_RESPONSE_NORMAL:
00117 break;
00118 case AMQP_RESPONSE_NONE:
00119 errorMessage.assign("No response in ");
00120 errorMessage.append(context);
00121 throw rabbitmq_handler_exception(errorMessage);
00122 case AMQP_RESPONSE_LIBRARY_EXCEPTION:
00123 case AMQP_RESPONSE_SERVER_EXCEPTION:
00124 switch (x.reply.id) {
00125 case AMQP_CONNECTION_CLOSE_METHOD:
00126 errorMessage.assign("Connection closed in ");
00127 errorMessage.append(context);
00128 throw rabbitmq_handler_exception(errorMessage);
00129 case AMQP_CHANNEL_CLOSE_METHOD:
00130 errorMessage.assign("Channel closed in ");
00131 errorMessage.append(context);
00132 throw rabbitmq_handler_exception(errorMessage);
00133 default:
00134 errorMessage.assign("Unknown error in ");
00135 errorMessage.append(context);
00136 throw rabbitmq_handler_exception(errorMessage);
00137 }
00138 }
00139 }
00140
00141 }