00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022 #include <plugin/slave/queue_producer.h>
00023 #include <drizzled/errmsg_print.h>
00024 #include <drizzled/sql/result_set.h>
00025 #include <drizzled/execute.h>
00026 #include <drizzled/gettext.h>
00027 #include <drizzled/message/transaction.pb.h>
00028 #include <boost/lexical_cast.hpp>
00029 #include <google/protobuf/text_format.h>
00030
00031 using namespace std;
00032 using namespace drizzled;
00033
00034 namespace slave
00035 {
00036
00037 QueueProducer::~QueueProducer()
00038 {
00039 if (_is_connected)
00040 closeConnection();
00041 }
00042
00043 bool QueueProducer::init()
00044 {
00045 setIOState("", true);
00046 return reconnect(true);
00047 }
00048
00049 bool QueueProducer::process()
00050 {
00051 if (_saved_max_commit_id == 0)
00052 {
00053 if (not queryForMaxCommitId(&_saved_max_commit_id))
00054 {
00055 if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
00056 {
00057 if (reconnect(false))
00058 {
00059 return true;
00060 }
00061 else
00062 {
00063 _last_error_message= "Master offline";
00064 return false;
00065 }
00066 }
00067 else
00068 {
00069 return false;
00070 }
00071 }
00072 }
00073
00074
00075 enum drizzled::error_t err;
00076 while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
00077 {}
00078
00079 if (err == ER_YES)
00080 {
00081 if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
00082 {
00083 if (reconnect(false))
00084 {
00085 return true;
00086 }
00087 else
00088 {
00089 _last_error_message= "Master offline";
00090 return false;
00091 }
00092 }
00093 else
00094 {
00095 return false;
00096 }
00097 }
00098
00099 return true;
00100 }
00101
00102 void QueueProducer::shutdown()
00103 {
00104 setIOState(_last_error_message, false);
00105 if (_is_connected)
00106 closeConnection();
00107 }
00108
00109 bool QueueProducer::reconnect(bool initial_connection)
00110 {
00111 if (not initial_connection)
00112 {
00113 errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
00114 }
00115
00116 _is_connected= false;
00117 _last_return= DRIZZLE_RETURN_OK;
00118 _last_error_message.clear();
00119 boost::posix_time::seconds duration(_seconds_between_reconnects);
00120
00121 uint32_t attempts= 1;
00122
00123 while (not openConnection())
00124 {
00125 if (attempts++ == _max_reconnects)
00126 break;
00127 boost::this_thread::sleep(duration);
00128 }
00129
00130 return _is_connected;
00131 }
00132
00133 bool QueueProducer::openConnection()
00134 {
00135 if (drizzle_create(&_drizzle) == NULL)
00136 {
00137 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
00138 _last_error_message= "Replication slave: ";
00139 _last_error_message.append(drizzle_error(&_drizzle));
00140 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00141 return false;
00142 }
00143
00144 if (drizzle_con_create(&_drizzle, &_connection) == NULL)
00145 {
00146 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
00147 _last_error_message= "Replication slave: ";
00148 _last_error_message.append(drizzle_error(&_drizzle));
00149 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00150 return false;
00151 }
00152
00153 drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
00154 drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
00155
00156 drizzle_return_t ret= drizzle_con_connect(&_connection);
00157
00158 if (ret != DRIZZLE_RETURN_OK)
00159 {
00160 _last_return= ret;
00161 _last_error_message= "Replication slave: ";
00162 _last_error_message.append(drizzle_error(&_drizzle));
00163 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00164 return false;
00165 }
00166
00167 _is_connected= true;
00168
00169 return true;
00170 }
00171
00172 bool QueueProducer::closeConnection()
00173 {
00174 drizzle_return_t ret;
00175 drizzle_result_st result;
00176
00177 _is_connected= false;
00178
00179 if (drizzle_quit(&_connection, &result, &ret) == NULL)
00180 {
00181 _last_return= ret;
00182 drizzle_result_free(&result);
00183 return false;
00184 }
00185
00186 drizzle_result_free(&result);
00187
00188 return true;
00189 }
00190
00191 bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
00192 {
00193
00194
00195
00196
00197
00198
00199
00200 string sql("SELECT MAX(x.cid) FROM"
00201 " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
00202 " UNION ALL SELECT `last_applied_commit_id` AS cid"
00203 " FROM `sys_replication`.`applier_state`) AS x");
00204
00205 sql::ResultSet result_set(1);
00206 Execute execute(*(_session.get()), true);
00207 execute.run(sql, result_set);
00208 assert(result_set.getMetaData().getColumnCount() == 1);
00209
00210
00211 uint32_t found_rows= 0;
00212 while (result_set.next())
00213 {
00214 string value= result_set.getString(0);
00215
00216 if ((value == "") || (found_rows == 1))
00217 break;
00218
00219 assert(result_set.isNull(0) == false);
00220 *max_commit_id= boost::lexical_cast<uint64_t>(value);
00221 found_rows++;
00222 }
00223
00224 if (found_rows == 0)
00225 {
00226 _last_error_message= "Could not determine last committed transaction.";
00227 return false;
00228 }
00229
00230 return true;
00231 }
00232
00233 bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
00234 vector<uint64_t> &list)
00235 {
00236 (void)list;
00237 string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
00238 " WHERE `commit_id` > ");
00239 sql.append(boost::lexical_cast<string>(max_commit_id));
00240 sql.append(" ORDER BY `commit_id` LIMIT 25");
00241
00242 drizzle_return_t ret;
00243 drizzle_result_st result;
00244 drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
00245
00246 if (ret != DRIZZLE_RETURN_OK)
00247 {
00248 _last_return= ret;
00249 _last_error_message= "Replication slave: ";
00250 _last_error_message.append(drizzle_error(&_drizzle));
00251 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00252 drizzle_result_free(&result);
00253 return false;
00254 }
00255
00256 ret= drizzle_result_buffer(&result);
00257
00258 if (ret != DRIZZLE_RETURN_OK)
00259 {
00260 _last_return= ret;
00261 _last_error_message= "Replication slave: ";
00262 _last_error_message.append(drizzle_error(&_drizzle));
00263 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00264 drizzle_result_free(&result);
00265 return false;
00266 }
00267
00268 drizzle_row_t row;
00269
00270 while ((row= drizzle_row_next(&result)) != NULL)
00271 {
00272 if (row[0])
00273 {
00274 list.push_back(boost::lexical_cast<uint32_t>(row[0]));
00275 }
00276 else
00277 {
00278 _last_return= ret;
00279 _last_error_message= "Replication slave: Unexpected NULL for trx id";
00280 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00281 drizzle_result_free(&result);
00282 return false;
00283 }
00284 }
00285
00286 drizzle_result_free(&result);
00287 return true;
00288 }
00289
00290
00291 bool QueueProducer::queueInsert(const char *trx_id,
00292 const char *seg_id,
00293 const char *commit_id,
00294 const char *msg,
00295 const char *msg_length)
00296 {
00297 message::Transaction message;
00298
00299 message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
00300
00301
00302
00303
00304 string sql= "INSERT INTO `sys_replication`.`queue`"
00305 " (`trx_id`, `seg_id`, `commit_order`, `msg`) VALUES (";
00306 sql.append(trx_id);
00307 sql.append(", ", 2);
00308 sql.append(seg_id);
00309 sql.append(", ", 2);
00310 sql.append(commit_id);
00311 sql.append(", '", 3);
00312
00313
00314
00315
00316
00317
00318
00319 string message_text;
00320 google::protobuf::TextFormat::PrintToString(message, &message_text);
00321
00322
00323
00324
00325 string::iterator it= message_text.begin();
00326 for (; it != message_text.end(); ++it)
00327 {
00328 if (*it == '\"')
00329 {
00330 it= message_text.insert(it, '\\');
00331 ++it;
00332 }
00333 else if (*it == '\'')
00334 {
00335 it= message_text.insert(it, '\\');
00336 ++it;
00337 it= message_text.insert(it, '\\');
00338 ++it;
00339 }
00340 else if (*it == '\\')
00341 {
00342 it= message_text.insert(it, '\\');
00343 ++it;
00344 it= message_text.insert(it, '\\');
00345 ++it;
00346 it= message_text.insert(it, '\\');
00347 ++it;
00348 }
00349 else if (*it == ';')
00350 {
00351 it= message_text.insert(it, '\\');
00352 ++it;
00353 }
00354 }
00355
00356 sql.append(message_text);
00357 sql.append("')", 2);
00358
00359 vector<string> statements;
00360 statements.push_back(sql);
00361
00362 if (not executeSQL(statements))
00363 {
00364 markInErrorState();
00365 return false;
00366 }
00367
00368 uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
00369 if (tmp_commit_id > _saved_max_commit_id)
00370 _saved_max_commit_id= tmp_commit_id;
00371
00372 return true;
00373 }
00374
00375
00376 enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
00377 {
00378 vector<uint64_t> trx_id_list;
00379
00380 if (not queryForTrxIdList(max_commit_id, trx_id_list))
00381 return ER_YES;
00382
00383 if (trx_id_list.size() == 0)
00384 {
00385 return ER_NO;
00386 }
00387
00388
00389
00390
00391 string sql= "SELECT `id`, `segid`, `commit_id`, `message`, `message_len` "
00392 " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
00393
00394 for (size_t x= 0; x < trx_id_list.size(); x++)
00395 {
00396 if (x > 0)
00397 sql.append(", ", 2);
00398 sql.append(boost::lexical_cast<string>(trx_id_list[x]));
00399 }
00400
00401 sql.append(")", 1);
00402
00403 drizzle_return_t ret;
00404 drizzle_result_st result;
00405 drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
00406
00407 if (ret != DRIZZLE_RETURN_OK)
00408 {
00409 _last_return= ret;
00410 _last_error_message= "Replication slave: ";
00411 _last_error_message.append(drizzle_error(&_drizzle));
00412 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00413 drizzle_result_free(&result);
00414 return ER_YES;
00415 }
00416
00417
00418
00419 ret= drizzle_result_buffer(&result);
00420
00421 if (ret != DRIZZLE_RETURN_OK)
00422 {
00423 _last_return= ret;
00424 _last_error_message= "Replication slave: ";
00425 _last_error_message.append(drizzle_error(&_drizzle));
00426 errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00427 drizzle_result_free(&result);
00428 return ER_YES;
00429 }
00430
00431 drizzle_row_t row;
00432
00433 while ((row= drizzle_row_next(&result)) != NULL)
00434 {
00435 if (not queueInsert(row[0], row[1], row[2], row[3], row[4]))
00436 {
00437 errmsg_printf(error::ERROR,
00438 _("Replication slave: Unable to insert into queue."));
00439 drizzle_result_free(&result);
00440 return ER_YES;
00441 }
00442 }
00443
00444 drizzle_result_free(&result);
00445
00446 return EE_OK;
00447 }
00448
00449
00450 void QueueProducer::setIOState(const string &err_msg, bool status)
00451 {
00452 vector<string> statements;
00453 string sql;
00454 string msg(err_msg);
00455
00456 if (not status)
00457 {
00458 sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
00459 }
00460 else
00461 {
00462 sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
00463 }
00464
00465 sql.append(", `error_msg` = '", 17);
00466
00467
00468 string::iterator it;
00469 for (it= msg.begin(); it != msg.end(); ++it)
00470 {
00471 if (*it == '\'')
00472 {
00473 it= msg.insert(it, '\'');
00474 ++it;
00475 }
00476 else if (*it == ';')
00477 {
00478 it= msg.insert(it, '\\');
00479 ++it;
00480 }
00481 }
00482
00483 sql.append(msg);
00484 sql.append("'", 1);
00485
00486 statements.push_back(sql);
00487 executeSQL(statements);
00488 }
00489
00490 }