Drizzled Public API Documentation

rabbitmq_handler.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2010 Marcus Eriksson
5  *
6  * Authors:
7  *
8  * Marcus Eriksson <krummas@gmail.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
25 
26 #include <config.h>
27 
28 #include <unistd.h>
29 
30 #include <drizzled/gettext.h>
31 
32 #include "rabbitmq_handler.h"
33 
34 using namespace std;
35 
36 namespace drizzle_plugin
37 {
38 
39 RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost,
40  const in_port_t rabbitMQPort,
41  const std::string &rabbitMQUsername,
42  const std::string &rabbitMQPassword,
43  const std::string &rabbitMQVirtualhost,
44  const std::string &rabbitMQExchange,
45  const std::string &rabbitMQRoutingKey)
47  rabbitmqConnection(amqp_new_connection()),
48  hostname(rabbitMQHost),
49  port(rabbitMQPort),
50  username(rabbitMQUsername),
51  password(rabbitMQPassword),
52  virtualhost(rabbitMQVirtualhost),
53  exchange(rabbitMQExchange),
54  routingKey(rabbitMQRoutingKey)
55 {
56  pthread_mutex_init(&publishLock, NULL);
57  connect();
58 }
59 
60 RabbitMQHandler::~RabbitMQHandler()
61 {
62  pthread_mutex_destroy(&publishLock);
63  disconnect();
64 }
65 
66 void RabbitMQHandler::publish(void *message,
67  const int length)
69 {
70  pthread_mutex_lock(&publishLock);
71  amqp_bytes_t b;
72  b.bytes= message;
73  b.len= length;
74 
75  if (amqp_basic_publish(rabbitmqConnection,
76  1,
77  amqp_cstring_bytes(exchange.c_str()),
78  amqp_cstring_bytes(routingKey.c_str()),
79  0,
80  0,
81  NULL,
82  b) < 0)
83  {
84  pthread_mutex_unlock(&publishLock);
85  throw rabbitmq_handler_exception("Could not publish message");
86  }
87  pthread_mutex_unlock(&publishLock);
88 
89 }
90 
91 void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
92 {
93  disconnect();
94  connect();
95 }
96 
97 void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception)
98 {
99  try
100  {
101  handleAMQPError(amqp_channel_close(rabbitmqConnection,
102  1,
103  AMQP_REPLY_SUCCESS),
104  "close channel");
105  handleAMQPError(amqp_connection_close(rabbitmqConnection,
106  AMQP_REPLY_SUCCESS),
107  "close connection");
108  amqp_destroy_connection(rabbitmqConnection);
109  }
110  catch(exception& e) {} // do not throw in destructorn
111  close(sockfd);
112 }
113 
114 void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
115  sockfd = amqp_open_socket(hostname.c_str(), port);
116  if(sockfd < 0)
117  {
118  throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
119  }
120  amqp_set_sockfd(rabbitmqConnection, sockfd);
121  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
122  handleAMQPError(amqp_login(rabbitmqConnection,
123  virtualhost.c_str(),
124  0,
125  131072,
126  0,
127  AMQP_SASL_METHOD_PLAIN,
128  username.c_str(),
129  password.c_str()),
130  "rabbitmq login");
131  /* open the channel */
132  amqp_channel_open(rabbitmqConnection, 1);
133  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
134  amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
135  amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
136  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
137 }
138 
139 void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
140 {
141  string errorMessage("");
142  switch (x.reply_type) {
143  case AMQP_RESPONSE_NORMAL:
144  break;
145  case AMQP_RESPONSE_NONE:
146  errorMessage.assign("No response in ");
147  errorMessage.append(context);
148  throw rabbitmq_handler_exception(errorMessage);
149  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
150  case AMQP_RESPONSE_SERVER_EXCEPTION:
151  switch (x.reply.id) {
152  case AMQP_CONNECTION_CLOSE_METHOD:
153  errorMessage.assign("Connection closed in ");
154  errorMessage.append(context);
155  throw rabbitmq_handler_exception(errorMessage);
156  case AMQP_CHANNEL_CLOSE_METHOD:
157  errorMessage.assign("Channel closed in ");
158  errorMessage.append(context);
159  throw rabbitmq_handler_exception(errorMessage);
160  default:
161  errorMessage.assign("Unknown error in ");
162  errorMessage.append(context);
163  throw rabbitmq_handler_exception(errorMessage);
164  }
165  }
166 }
167 
168 } /* namespace drizzle_plugin */