Drizzled Public API Documentation

replication_schema.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2011 David Shrewsbury
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 #include <plugin/slave/replication_schema.h>
23 #include <drizzled/execute.h>
24 #include <drizzled/sql/result_set.h>
25 #include <string>
26 #include <vector>
27 #include <boost/lexical_cast.hpp>
28 
29 using namespace std;
30 using namespace drizzled;
31 using namespace boost;
32 
33 namespace slave
34 {
35 
36 bool ReplicationSchema::create()
37 {
38  vector<string> sql;
39 
40  sql.push_back("COMMIT");
41  sql.push_back("CREATE SCHEMA IF NOT EXISTS `sys_replication` REPLICATE=FALSE");
42 
43  if (not executeSQL(sql))
44  return false;
45 
46  /*
47  Create our IO thread state information table if we need to.
48  */
49 
50  /*
51  Table: io_state
52  Version 1.0: Initial definition
53  Version 1.1: Added master_id and PK on master_id
54  */
55 
56 
57  sql.clear();
58  sql.push_back("COMMIT");
59  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
60  " `master_id` BIGINT NOT NULL,"
61  " `status` VARCHAR(20) NOT NULL,"
62  " `error_msg` VARCHAR(250))"
63  " COMMENT = 'VERSION 1.1'");
64 
65  if (not executeSQL(sql))
66  return false;
67 
68  /*
69  * Create our applier thread state information table if we need to.
70  */
71 
72  /*
73  * Table: applier_state
74  * Version 1.0: Initial definition
75  * Version 1.1: Added originating_server_uuid and originating_commit_id
76  * Version 1.2: Added master_id and changed PK to master_id
77  */
78 
79  sql.clear();
80  sql.push_back("COMMIT");
81  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
82  " (`master_id` BIGINT NOT NULL,"
83  " `last_applied_commit_id` BIGINT NOT NULL,"
84  " `originating_server_uuid` VARCHAR(36) NOT NULL,"
85  " `originating_commit_id` BIGINT NOT NULL,"
86  " `status` VARCHAR(20) NOT NULL,"
87  " `error_msg` VARCHAR(250))"
88  " COMMENT = 'VERSION 1.2'");
89 
90  if (not executeSQL(sql))
91  return false;
92 
93  /*
94  * Create our message queue table if we need to.
95  * Version 1.0: Initial definition
96  * Version 1.1: Added originating_server_uuid and originating_commit_id
97  * Version 1.2: Added master_id and changed PK to (master_id, trx_id, seg_id)
98  */
99 
100  sql.clear();
101  sql.push_back("COMMIT");
102  sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue`"
103  " (`trx_id` BIGINT NOT NULL, `seg_id` INT NOT NULL,"
104  " `commit_order` BIGINT,"
105  " `originating_server_uuid` VARCHAR(36) NOT NULL,"
106  " `originating_commit_id` BIGINT NOT NULL,"
107  " `msg` BLOB,"
108  " `master_id` BIGINT NOT NULL,"
109  " PRIMARY KEY(`master_id`, `trx_id`, `seg_id`))"
110  " COMMENT = 'VERSION 1.2'");
111  if (not executeSQL(sql))
112  return false;
113 
114  return true;
115 }
116 
117 bool ReplicationSchema::createInitialIORow(uint32_t master_id)
118 {
119  vector<string> sql;
120 
121  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
122 
123  sql::ResultSet result_set(1);
124  Execute execute(*(_session.get()), true);
125  execute.run(sql[0], result_set);
126  result_set.next();
127  string count= result_set.getString(0);
128 
129  if (count == "0")
130  {
131  sql.clear();
132  sql.push_back("INSERT INTO `sys_replication`.`io_state` (`master_id`, `status`) VALUES ("
133  + boost::lexical_cast<string>(master_id)
134  + ", 'STOPPED')");
135  if (not executeSQL(sql))
136  return false;
137  }
138 
139  return true;
140 }
141 
142 bool ReplicationSchema::createInitialApplierRow(uint32_t master_id)
143 {
144  vector<string> sql;
145 
146  sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state` WHERE `master_id` = " + boost::lexical_cast<string>(master_id));
147 
148  sql::ResultSet result_set(1);
149  Execute execute(*(_session.get()), true);
150  execute.run(sql[0], result_set);
151  result_set.next();
152  string count= result_set.getString(0);
153 
154  if (count == "0")
155  {
156  sql.clear();
157  sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
158  " (`master_id`, `last_applied_commit_id`,"
159  " `originating_server_uuid`, `originating_commit_id`,"
160  " `status`) VALUES ("
161  + boost::lexical_cast<string>(master_id)
162  + ",0,"
163  + "'" + _session.get()->getServerUUID() + "'"
164  + ",0,'STOPPED')");
165 
166  if (not executeSQL(sql))
167  return false;
168  }
169 
170  return true;
171 }
172 
173 
174 bool ReplicationSchema::setInitialMaxCommitId(uint32_t master_id, uint64_t value)
175 {
176  vector<string> sql;
177 
178  sql.push_back("UPDATE `sys_replication`.`applier_state`"
179  " SET `last_applied_commit_id` = "
180  + lexical_cast<string>(value)
181  + " WHERE `master_id` = "
182  + lexical_cast<string>(master_id));
183 
184  return executeSQL(sql);
185 }
186 
187 } /* namespace slave */