Drizzled Public API Documentation

queue_producer.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2011 David Shrewsbury
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include <plugin/slave/queue_producer.h>
24 #include <drizzled/errmsg_print.h>
25 #include <drizzled/sql/result_set.h>
26 #include <drizzled/execute.h>
27 #include <drizzled/gettext.h>
28 #include <drizzled/message/transaction.pb.h>
29 #include <boost/lexical_cast.hpp>
30 #include <google/protobuf/text_format.h>
31 #include <string>
32 #include <vector>
33 
34 using namespace std;
35 using namespace drizzled;
36 
37 namespace slave
38 {
39 
40 QueueProducer::~QueueProducer()
41 {
42  if (_is_connected)
43  closeConnection();
44 }
45 
46 bool QueueProducer::init()
47 {
48  setIOState("", true);
49  return reconnect(true);
50 }
51 
52 bool QueueProducer::process()
53 {
54  if (_saved_max_commit_id == 0)
55  {
56  if (not queryForMaxCommitId(&_saved_max_commit_id))
57  {
58  if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
59  {
60  if (reconnect(false))
61  {
62  return true; /* reconnect successful, try again */
63  }
64  else
65  {
66  _last_error_message= "Master offline";
67  return false; /* reconnect failed, shutdown the thread */
68  }
69  }
70  else
71  {
72  return false; /* unrecoverable error, shutdown the thread */
73  }
74  }
75  }
76 
77  /* Keep getting events until caught up */
78  enum drizzled::error_t err;
79  while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
80  {}
81 
82  if (err == ER_YES) /* We encountered an error */
83  {
84  if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
85  {
86  if (reconnect(false))
87  {
88  return true; /* reconnect successful, try again */
89  }
90  else
91  {
92  _last_error_message= "Master offline";
93  return false; /* reconnect failed, shutdown the thread */
94  }
95  }
96  else
97  {
98  return false; /* unrecoverable error, shutdown the thread */
99  }
100  }
101 
102  return true;
103 }
104 
105 void QueueProducer::shutdown()
106 {
107  setIOState(_last_error_message, false);
108  if (_is_connected)
109  closeConnection();
110 }
111 
112 bool QueueProducer::reconnect(bool initial_connection)
113 {
114  if (not initial_connection)
115  {
116  errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
117  }
118 
119  _is_connected= false;
120  _last_return= DRIZZLE_RETURN_OK;
121  _last_error_message.clear();
122  boost::posix_time::seconds duration(_seconds_between_reconnects);
123 
124  uint32_t attempts= 1;
125 
126  setIOState("Connecting...", true);
127 
128  while (not openConnection())
129  {
130  char buf[250];
131  snprintf(buf, sizeof(buf),_("Connection attempt %d of %d failed, sleeping for %d seconds and retrying. %s"), attempts, _max_reconnects, _seconds_between_reconnects, _last_error_message.c_str());
132 
133  setIOState(buf, true);
134  if (attempts++ == _max_reconnects)
135  break;
136  boost::this_thread::sleep(duration);
137  }
138 
139  setIOState(_is_connected ? _("Connected") : _("Disconnected"), true);
140 
141  return _is_connected;
142 }
143 
144 bool QueueProducer::openConnection()
145 {
146  if ((_drizzle= drizzle_create()) == NULL)
147  {
148  _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
149  _last_error_message= "Replication slave: ";
150  _last_error_message.append(drizzle_error(_drizzle));
151  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
152  return false;
153  }
154 
155  if ((_connection= drizzle_con_create(_drizzle)) == NULL)
156  {
157  _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
158  _last_error_message= "Replication slave: ";
159  _last_error_message.append(drizzle_error(_drizzle));
160  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
161  return false;
162  }
163 
164  drizzle_con_set_tcp(_connection, _master_host.c_str(), _master_port);
165  drizzle_con_set_auth(_connection, _master_user.c_str(), _master_pass.c_str());
166 
167  drizzle_return_t ret= drizzle_con_connect(_connection);
168 
169  if (ret != DRIZZLE_RETURN_OK)
170  {
171  _last_return= ret;
172  _last_error_message= "Replication slave: ";
173  _last_error_message.append(drizzle_error(_drizzle));
174  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
175  return false;
176  }
177 
178  _is_connected= true;
179 
180  return true;
181 }
182 
183 bool QueueProducer::closeConnection()
184 {
185  drizzle_return_t ret;
186  drizzle_result_st result;
187 
188  _is_connected= false;
189 
190  if (drizzle_quit(_connection, &result, &ret) == NULL)
191  {
192  _last_return= ret;
193  drizzle_result_free(&result);
194  return false;
195  }
196 
197  drizzle_result_free(&result);
198 
199  return true;
200 }
201 
202 bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
203 {
204  /*
205  * This SQL will get the maximum commit_id value we have pulled over from
206  * the master. We query two tables because either the queue will be empty,
207  * in which case the last_applied_commit_id will be the value we want, or
208  * we have yet to drain the queue, we get the maximum value still in
209  * the queue.
210  */
211  string sql("SELECT MAX(x.cid) FROM"
212  " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
213  " WHERE `master_id` = "
214  + boost::lexical_cast<string>(masterId())
215  + " UNION ALL SELECT `last_applied_commit_id` AS cid"
216  + " FROM `sys_replication`.`applier_state` WHERE `master_id` = "
217  + boost::lexical_cast<string>(masterId())
218  + ") AS x");
219 
220  sql::ResultSet result_set(1);
221  Execute execute(*(_session.get()), true);
222  execute.run(sql, result_set);
223  assert(result_set.getMetaData().getColumnCount() == 1);
224 
225  /* Really should only be 1 returned row */
226  uint32_t found_rows= 0;
227  while (result_set.next())
228  {
229  string value= result_set.getString(0);
230 
231  if ((value == "") || (found_rows == 1))
232  break;
233 
234  assert(result_set.isNull(0) == false);
235  *max_commit_id= boost::lexical_cast<uint64_t>(value);
236  found_rows++;
237  }
238 
239  if (found_rows == 0)
240  {
241  _last_error_message= "Could not determine last committed transaction.";
242  return false;
243  }
244 
245  return true;
246 }
247 
248 bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
249  vector<uint64_t> &list)
250 {
251  (void)list;
252  string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
253  " WHERE `commit_id` > ");
254  sql.append(boost::lexical_cast<string>(max_commit_id));
255  sql.append(" AND `originating_server_uuid` != ");
256  sql.append("'", 1);
257  sql.append(_session.get()->getServerUUID());
258  sql.append("'", 1);
259  sql.append(" ORDER BY `commit_id` LIMIT 25");
260 
261  drizzle_return_t ret;
262  drizzle_result_st result;
263  drizzle_query_str(_connection, &result, sql.c_str(), &ret);
264 
265  if (ret != DRIZZLE_RETURN_OK)
266  {
267  _last_return= ret;
268  _last_error_message= "Replication slave: ";
269  _last_error_message.append(drizzle_error(_drizzle));
270  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
271  drizzle_result_free(&result);
272  return false;
273  }
274 
275  ret= drizzle_result_buffer(&result);
276 
277  if (ret != DRIZZLE_RETURN_OK)
278  {
279  _last_return= ret;
280  _last_error_message= "Replication slave: ";
281  _last_error_message.append(drizzle_error(_drizzle));
282  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
283  drizzle_result_free(&result);
284  return false;
285  }
286 
287  drizzle_row_t row;
288 
289  while ((row= drizzle_row_next(&result)) != NULL)
290  {
291  if (row[0])
292  {
293  list.push_back(boost::lexical_cast<uint32_t>(row[0]));
294  }
295  else
296  {
297  _last_return= ret;
298  _last_error_message= "Replication slave: Unexpected NULL for trx id";
299  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
300  drizzle_result_free(&result);
301  return false;
302  }
303  }
304 
305  drizzle_result_free(&result);
306  return true;
307 }
308 
309 
310 bool QueueProducer::queueInsert(const char *trx_id,
311  const char *seg_id,
312  const char *commit_id,
313  const char *originating_server_uuid,
314  const char *originating_commit_id,
315  const char *msg,
316  const char *msg_length)
317 {
318  message::Transaction message;
319 
320  message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
321 
322  /*
323  * The SQL to insert our results into the local queue.
324  */
325  string sql= "INSERT INTO `sys_replication`.`queue`"
326  " (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
327  " `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
328  sql.append(boost::lexical_cast<string>(masterId()));
329  sql.append(", ", 2);
330  sql.append(trx_id);
331  sql.append(", ", 2);
332  sql.append(seg_id);
333  sql.append(", ", 2);
334  sql.append(commit_id);
335  sql.append(", '", 3);
336  sql.append(originating_server_uuid);
337  sql.append("' , ", 4);
338  sql.append(originating_commit_id);
339  sql.append(", '", 3);
340 
341  /*
342  * Ideally we would store the Transaction message in binary form, as it
343  * it stored on the master and tranferred to the slave. However, we are
344  * inserting using drizzle::Execute which doesn't really handle binary
345  * data. Until that is changed, we store as plain text.
346  */
347  string message_text;
348  google::protobuf::TextFormat::PrintToString(message, &message_text);
349 
350  /*
351  * Execution using drizzled::Execute requires some special escaping.
352  */
353  string::iterator it= message_text.begin();
354  for (; it != message_text.end(); ++it)
355  {
356  if (*it == '\"')
357  {
358  it= message_text.insert(it, '\\');
359  ++it;
360  }
361  else if (*it == '\'')
362  {
363  it= message_text.insert(it, '\\');
364  ++it;
365  it= message_text.insert(it, '\\');
366  ++it;
367  }
368  else if (*it == '\\')
369  {
370  it= message_text.insert(it, '\\');
371  ++it;
372  it= message_text.insert(it, '\\');
373  ++it;
374  it= message_text.insert(it, '\\');
375  ++it;
376  }
377  else if (*it == ';')
378  {
379  it= message_text.insert(it, '\\');
380  ++it; /* advance back to the semicolon */
381  }
382  }
383 
384  sql.append(message_text);
385  sql.append("')", 2);
386 
387  vector<string> statements;
388  statements.push_back(sql);
389 
390  if (not executeSQL(statements))
391  {
392  markInErrorState();
393  return false;
394  }
395 
396  uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
397  if (tmp_commit_id > _saved_max_commit_id)
398  _saved_max_commit_id= tmp_commit_id;
399 
400  return true;
401 }
402 
403 
404 enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
405 {
406  vector<uint64_t> trx_id_list;
407 
408  if (not queryForTrxIdList(max_commit_id, trx_id_list))
409  return ER_YES;
410 
411  if (trx_id_list.size() == 0) /* nothing to get from the master */
412  {
413  return ER_NO;
414  }
415 
416  /*
417  * The SQL to pull everything we need from the master.
418  */
419  string sql= "SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`,"
420  " `originating_commit_id`, `message`, `message_len` "
421  " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
422 
423  for (size_t x= 0; x < trx_id_list.size(); x++)
424  {
425  if (x > 0)
426  sql.append(", ", 2);
427  sql.append(boost::lexical_cast<string>(trx_id_list[x]));
428  }
429 
430  sql.append(")", 1);
431  sql.append(" ORDER BY `commit_id` ASC");
432 
433  drizzle_return_t ret;
434  drizzle_result_st result;
435  drizzle_query_str(_connection, &result, sql.c_str(), &ret);
436 
437  if (ret != DRIZZLE_RETURN_OK)
438  {
439  _last_return= ret;
440  _last_error_message= "Replication slave: ";
441  _last_error_message.append(drizzle_error(_drizzle));
442  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
443  drizzle_result_free(&result);
444  return ER_YES;
445  }
446 
447  /* TODO: Investigate 1-row-at-a-time buffering */
448 
449  ret= drizzle_result_buffer(&result);
450 
451  if (ret != DRIZZLE_RETURN_OK)
452  {
453  _last_return= ret;
454  _last_error_message= "Replication slave: ";
455  _last_error_message.append(drizzle_error(_drizzle));
456  errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
457  drizzle_result_free(&result);
458  return ER_YES;
459  }
460 
461  drizzle_row_t row;
462 
463  while ((row= drizzle_row_next(&result)) != NULL)
464  {
465  if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
466  {
467  errmsg_printf(error::ERROR,
468  _("Replication slave: Unable to insert into queue."));
469  drizzle_result_free(&result);
470  return ER_YES;
471  }
472  }
473 
474  drizzle_result_free(&result);
475 
476  return EE_OK;
477 }
478 
479 
480 void QueueProducer::setIOState(const string &err_msg, bool status)
481 {
482  vector<string> statements;
483  string sql;
484  string msg(err_msg);
485 
486  if (not status)
487  {
488  sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
489  }
490  else
491  {
492  sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
493  }
494 
495  sql.append(", `error_msg` = '", 17);
496 
497  /* Escape embedded quotes and statement terminators */
498  string::iterator it;
499  for (it= msg.begin(); it != msg.end(); ++it)
500  {
501  if (*it == '\'')
502  {
503  it= msg.insert(it, '\'');
504  ++it; /* advance back to the quote */
505  }
506  else if (*it == ';')
507  {
508  it= msg.insert(it, '\\');
509  ++it; /* advance back to the semicolon */
510  }
511  }
512 
513  sql.append(msg);
514  sql.append("' WHERE `master_id` = ");
515  sql.append(boost::lexical_cast<string>(masterId()));
516 
517  statements.push_back(sql);
518  executeSQL(statements);
519 }
520 
521 } /* namespace slave */