Drizzled Public API Documentation

zeromq_log.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2011 Marcus Eriksson
5  *
6  * Authors:
7  *
8  * Marcus Eriksson <krummas@gmail.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23  */
24 
25 #include <config.h>
26 #include "zeromq_log.h"
27 #include <drizzled/message/transaction.pb.h>
28 #include <google/protobuf/io/coded_stream.h>
29 #include <stdio.h>
30 #include <drizzled/module/registry.h>
31 #include <drizzled/plugin.h>
32 #include <stdint.h>
33 #include <boost/program_options.hpp>
35 #include <zmq.h>
36 
37 namespace po= boost::program_options;
38 
39 using namespace std;
40 using namespace drizzled;
41 using namespace google;
42 
43 namespace drizzle_plugin
44 {
45 
46 ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
47  plugin::TransactionApplier(name)
48 {
49  void *context= zmq_init(1);
50  _socket= zmq_socket (context, ZMQ_PUB);
51  assert (_socket);
52  int rc= zmq_bind (_socket, endpoint.c_str());
53  assert (rc == 0);
54  pthread_mutex_init(&publishLock, NULL);
55 }
56 
57 ZeroMQLog::~ZeroMQLog()
58 {
59  zmq_close(_socket);
60  pthread_mutex_destroy(&publishLock);
61 }
62 
63 plugin::ReplicationReturnCode
65 {
66  size_t message_byte_length= to_apply.ByteSize();
67  uint8_t* buffer= new uint8_t[message_byte_length];
68  if(buffer == NULL)
69  {
70  errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
71  deactivate();
72  return plugin::UNKNOWN_ERROR;
73  }
74 
75  string schema= getSchemaName(to_apply);
76  zmq_msg_t schemamsg;
77  int rc= zmq_msg_init_size(&schemamsg, schema.length());
78  memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
79 
80  to_apply.SerializeWithCachedSizesToArray(buffer);
81  zmq_msg_t msg;
82  rc= zmq_msg_init_size(&msg, message_byte_length);
83  assert (rc == 0);
84  memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
85 
86  // need a mutex around this since several threads can call this method at the same time
87  pthread_mutex_lock(&publishLock);
88  rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
89  rc= zmq_send(_socket, &msg, 0);
90  pthread_mutex_unlock(&publishLock);
91 
92  zmq_msg_close(&msg);
93  zmq_msg_close(&schemamsg);
94  delete[] buffer;
95  return plugin::SUCCESS;
96 }
97 
98 string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
99  if(txn.statement_size() == 0) return "";
100 
101  const message::Statement &statement= txn.statement(0);
102 
103  switch(statement.type())
104  {
105  case message::Statement::INSERT:
106  return statement.insert_header().table_metadata().schema_name();
107  case message::Statement::UPDATE:
108  return statement.update_header().table_metadata().schema_name();
109  case message::Statement::DELETE:
110  return statement.delete_header().table_metadata().schema_name();
111  case message::Statement::CREATE_TABLE:
112  return statement.create_table_statement().table().schema();
113  case message::Statement::TRUNCATE_TABLE:
114  return statement.truncate_table_statement().table_metadata().schema_name();
115  case message::Statement::DROP_TABLE:
116  return statement.drop_table_statement().table_metadata().schema_name();
117  case message::Statement::CREATE_SCHEMA:
118  return statement.create_schema_statement().schema().name();
119  case message::Statement::DROP_SCHEMA:
120  return statement.drop_schema_statement().schema_name();
121  default:
122  return "";
123  }
124 }
125 
126 static ZeroMQLog *zeromqLogger;
127 
131 static int init(drizzled::module::Context &context)
132 {
133  const module::option_map &vm= context.getOptions();
134  zeromqLogger= new ZeroMQLog("zeromq_applier", vm["endpoint"].as<string>());
135  context.add(zeromqLogger);
136  ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
137  context.registerVariable(new sys_var_const_string_val("endpoint", vm["endpoint"].as<string>()));
138  return 0;
139 }
140 
141 
142 static void init_options(drizzled::module::option_context &context)
143 {
144  context("endpoint",
145  po::value<string>()->default_value("tcp://*:9999"),
146  _("End point to bind to"));
147  context("use-replicator",
148  po::value<string>()->default_value("default_replicator"),
149  _("Name of the replicator plugin to use (default='default_replicator')"));
150 
151 }
152 
153 } /* namespace drizzle_plugin */
154 
155 DRIZZLE_DECLARE_PLUGIN
156 {
157  DRIZZLE_VERSION_ID,
158  "zeromq",
159  "0.1",
160  "Marcus Eriksson",
161  N_("Publishes transactions to ZeroMQ"),
162  PLUGIN_LICENSE_GPL,
163  drizzle_plugin::init,
164  NULL,
165  drizzle_plugin::init_options,
166 }
167 DRIZZLE_DECLARE_PLUGIN_END;