Drizzled Public API Documentation

replication_slave.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2011 David Shrewsbury
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 #include <plugin/slave/replication_slave.h>
23 #include <drizzled/errmsg_print.h>
24 #include <drizzled/program_options/config_file.h>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/program_options.hpp>
27 #include <fstream>
28 #include <drizzled/plugin.h>
29 
30 using namespace std;
31 using namespace drizzled;
32 
33 namespace po= boost::program_options;
34 
35 namespace slave
36 {
37 
38 void ReplicationSlave::startup(Session &session)
39 {
40  (void)session;
41  if (not initWithConfig())
42  {
43  errmsg_printf(error::ERROR, _("Could not start slave services: %s\n"),
44  _error.c_str());
45  }
46  else
47  {
48  /* Start the IO threads */
49  boost::unordered_map<uint32_t, Master *>::const_iterator it;
50  for (it= _masters.begin(); it != _masters.end(); ++it)
51  {
52  it->second->start();
53  /* Consumer must know server IDs */
54  _consumer.addMasterId(it->first);
55  }
56 
57  _consumer_thread= boost::thread(&QueueConsumer::run, &_consumer);
58  }
59 }
60 
61 bool ReplicationSlave::initWithConfig()
62 {
63  po::variables_map vm;
64  po::options_description slave_options("Options for the slave plugin");
65 
66  /* Common options */
67  slave_options.add_options()
68  ("seconds-between-reconnects", po::value<uint32_t>()->default_value(30))
69  ("io-thread-sleep", po::value<uint32_t>()->default_value(5))
70  ("applier-thread-sleep", po::value<uint32_t>()->default_value(5))
71  ("ignore-errors", po::value<bool>()->default_value(false)->zero_tokens());
72 
73  /* Master defining options */
74  for (size_t num= 1; num <= 10; num++)
75  {
76  string section("master");
77  section.append(boost::lexical_cast<string>(num));
78  slave_options.add_options()
79  ((section + ".master-host").c_str(), po::value<string>()->default_value(""))
80  ((section + ".master-port").c_str(), po::value<uint16_t>()->default_value(3306))
81  ((section + ".master-user").c_str(), po::value<string>()->default_value(""))
82  ((section + ".master-pass").c_str(), po::value<string>()->default_value(""))
83  ((section + ".max-reconnects").c_str(), po::value<uint32_t>()->default_value(10))
84  ((section + ".max-commit-id").c_str(), po::value<uint64_t>());
85  }
86 
87  ifstream cf_stream(_config_file.c_str());
88 
89  if (not cf_stream.is_open())
90  {
91  _error= "Unable to open file ";
92  _error.append(_config_file);
93  return false;
94  }
95 
96  po::store(drizzled::program_options::parse_config_file(cf_stream, slave_options), vm);
97 
98  po::notify(vm);
99 
100  /*
101  * We will support 10 masters. This loope effectively creates the Master
102  * objects as they are referenced.
103  *
104  * @todo Support a variable number of master hosts.
105  */
106  for (size_t num= 1; num <= 10; num++)
107  {
108  string section("master");
109  section.append(boost::lexical_cast<string>(num));
110 
111  /* WARNING! Hack!
112  * We need to be able to determine when a master host is actually defined
113  * by the user vs. we are just using defaults. So if the hostname is ever
114  * the default value of "", then we'll assume that this section was not
115  * user defined.
116  */
117  if (vm[section + ".master-host"].as<string>() == "")
118  continue;
119 
120  _masters[num]= new (std::nothrow) Master(num);
121 
122  if (vm.count(section + ".master-host"))
123  master(num).producer().setMasterHost(vm[section + ".master-host"].as<string>());
124 
125  if (vm.count(section + ".master-port"))
126  master(num).producer().setMasterPort(vm[section + ".master-port"].as<uint16_t>());
127 
128  if (vm.count(section + ".master-user"))
129  master(num).producer().setMasterUser(vm[section + ".master-user"].as<string>());
130 
131  if (vm.count(section + ".master-pass"))
132  master(num).producer().setMasterPassword(vm[section + ".master-pass"].as<string>());
133 
134  if (vm.count(section + ".max-commit-id"))
135  master(num).producer().setCachedMaxCommitId(vm[section + ".max-commit-id"].as<uint64_t>());
136  }
137 
138  boost::unordered_map<uint32_t, Master *>::const_iterator it;
139 
140  for (it= _masters.begin(); it != _masters.end(); ++it)
141  {
142  if (vm.count("max-reconnects"))
143  it->second->producer().setMaxReconnectAttempts(vm["max-reconnects"].as<uint32_t>());
144 
145  if (vm.count("seconds-between-reconnects"))
146  it->second->producer().setSecondsBetweenReconnects(vm["seconds-between-reconnects"].as<uint32_t>());
147 
148  if (vm.count("io-thread-sleep"))
149  it->second->producer().setSleepInterval(vm["io-thread-sleep"].as<uint32_t>());
150  }
151 
152  if (vm.count("applier-thread-sleep"))
153  _consumer.setSleepInterval(vm["applier-thread-sleep"].as<uint32_t>());
154  if (vm.count("ignore-errors"))
155  _consumer.setIgnoreErrors(vm["ignore-errors"].as<bool>());
156 
157  /* setup schema and tables */
159  if (not rs.create())
160  {
161  _error= rs.getErrorMessage();
162  return false;
163  }
164 
165  for (it= _masters.begin(); it != _masters.end(); ++it)
166  {
167  /* make certain a row exists for each master */
168  rs.createInitialApplierRow(it->first);
169  rs.createInitialIORow(it->first);
170 
171  uint64_t cachedValue= it->second->producer().cachedMaxCommitId();
172  if (cachedValue)
173  {
174  if (not rs.setInitialMaxCommitId(it->first, cachedValue))
175  {
176  _error= rs.getErrorMessage();
177  return false;
178  }
179  }
180  }
181 
182  return true;
183 }
184 
185 } /* namespace slave */