00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <config.h>
00026 #include "rabbitmq_log.h"
00027 #include <drizzled/message/transaction.pb.h>
00028 #include <google/protobuf/io/coded_stream.h>
00029 #include <stdio.h>
00030 #include <drizzled/module/registry.h>
00031 #include <drizzled/plugin.h>
00032 #include <stdint.h>
00033 #include "rabbitmq_handler.h"
00034 #include <boost/program_options.hpp>
00035 #include <drizzled/module/option_map.h>
00036
00037 namespace po= boost::program_options;
00038
00039 using namespace std;
00040 using namespace drizzled;
00041 using namespace google;
00042
00043 namespace drizzle_plugin
00044 {
00045
00049 static port_constraint sysvar_rabbitmq_port;
00050
00051
00052 RabbitMQLog::RabbitMQLog(const string &name,
00053 const std::string &exchange,
00054 const std::string &routingkey,
00055 RabbitMQHandler* mqHandler) :
00056 plugin::TransactionApplier(name),
00057 _rabbitMQHandler(mqHandler),
00058 _exchange(exchange),
00059 _routingkey(routingkey)
00060 { }
00061
00062 RabbitMQLog::~RabbitMQLog()
00063 { }
00064
00065 plugin::ReplicationReturnCode
00066 RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
00067 {
00068 size_t message_byte_length= to_apply.ByteSize();
00069 uint8_t* buffer= new uint8_t[message_byte_length];
00070 if(buffer == NULL)
00071 {
00072 errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
00073 deactivate();
00074 return plugin::UNKNOWN_ERROR;
00075 }
00076
00077 to_apply.SerializeWithCachedSizesToArray(buffer);
00078 try
00079 {
00080 _rabbitMQHandler->publish(buffer,
00081 int(message_byte_length),
00082 _exchange,
00083 _routingkey);
00084 }
00085 catch(exception& e)
00086 {
00087 errmsg_printf(error::ERROR, _(e.what()));
00088 deactivate();
00089 return plugin::UNKNOWN_ERROR;
00090 }
00091 delete[] buffer;
00092 return plugin::SUCCESS;
00093 }
00094
00095 static RabbitMQLog *rabbitmqLogger;
00096 static RabbitMQHandler* rabbitmqHandler;
00097
00103 static int init(drizzled::module::Context &context)
00104 {
00105 const module::option_map &vm= context.getOptions();
00106
00107 try
00108 {
00109 rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
00110 sysvar_rabbitmq_port,
00111 vm["username"].as<string>(),
00112 vm["password"].as<string>(),
00113 vm["virtualhost"].as<string>());
00114 }
00115 catch (exception& e)
00116 {
00117 errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQHandler. Got error: %s\n"),
00118 e.what());
00119 return 1;
00120 }
00121 try
00122 {
00123 rabbitmqLogger= new RabbitMQLog("rabbit_log_applier",
00124 vm["exchange"].as<string>(),
00125 vm["routingkey"].as<string>(),
00126 rabbitmqHandler);
00127 }
00128 catch (exception& e)
00129 {
00130 errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
00131 e.what());
00132 return 1;
00133 }
00134
00135 context.add(rabbitmqLogger);
00136 ReplicationServices &replication_services= ReplicationServices::singleton();
00137 replication_services.attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
00138
00139 context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
00140 context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
00141 context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
00142 context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
00143 context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
00144 context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
00145 context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
00146
00147 return 0;
00148 }
00149
00150
00151 static void init_options(drizzled::module::option_context &context)
00152 {
00153 context("host",
00154 po::value<string>()->default_value("localhost"),
00155 _("Host name to connect to"));
00156 context("port",
00157 po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
00158 _("Port to connect to"));
00159 context("virtualhost",
00160 po::value<string>()->default_value("/"),
00161 _("RabbitMQ virtualhost"));
00162 context("username",
00163 po::value<string>()->default_value("guest"),
00164 _("RabbitMQ username"));
00165 context("password",
00166 po::value<string>()->default_value("guest"),
00167 _("RabbitMQ password"));
00168 context("use-replicator",
00169 po::value<string>()->default_value("default_replicator"),
00170 _("Name of the replicator plugin to use (default='default_replicator')"));
00171 context("exchange",
00172 po::value<string>()->default_value("ReplicationExchange"),
00173 _("Name of RabbitMQ exchange to publish to"));
00174 context("routingkey",
00175 po::value<string>()->default_value("ReplicationRoutingKey"),
00176 _("Name of RabbitMQ routing key to use"));
00177 }
00178
00179 }
00180
00181 DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);
00182