00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <config.h>
00025 #include <drizzled/algorithm/crc32.h>
00026 #include <drizzled/gettext.h>
00027 #include <drizzled/replication_services.h>
00028
00029 #include <sys/types.h>
00030 #include <sys/stat.h>
00031 #include <fcntl.h>
00032 #include <string>
00033 #include <fstream>
00034 #include <unistd.h>
00035
00036 #if TIME_WITH_SYS_TIME
00037 # include <sys/time.h>
00038 # include <time.h>
00039 #else
00040 # if HAVE_SYS_TIME_H
00041 # include <sys/time.h>
00042 # else
00043 # include <time.h>
00044 # endif
00045 #endif
00046
00047 #include <drizzled/message/transaction.pb.h>
00048
00049 #include <google/protobuf/io/coded_stream.h>
00050 #include <google/protobuf/io/zero_copy_stream_impl.h>
00051
00052 #include <drizzled/gettext.h>
00053
00058 using namespace std;
00059 using namespace drizzled;
00060 using namespace google;
00061
00062 static uint32_t server_id= 1;
00063 static uint64_t transaction_id= 1;
00064
00065 static uint64_t getNanoTimestamp()
00066 {
00067 #ifdef HAVE_CLOCK_GETTIME
00068 struct timespec tp;
00069 clock_gettime(CLOCK_REALTIME, &tp);
00070 return (uint64_t) tp.tv_sec * 10000000
00071 + (uint64_t) tp.tv_nsec;
00072 #else
00073 struct timeval tv;
00074 gettimeofday(&tv,NULL);
00075 return (uint64_t) tv.tv_sec * 10000000
00076 + (uint64_t) tv.tv_usec * 1000;
00077 #endif
00078 }
00079
00080 static void initTransactionContext(message::Transaction &transaction)
00081 {
00082 message::TransactionContext *ctx= transaction.mutable_transaction_context();
00083 ctx->set_transaction_id(transaction_id++);
00084 ctx->set_start_timestamp(getNanoTimestamp());
00085 ctx->set_server_id(server_id);
00086 }
00087
00088 static void finalizeTransactionContext(message::Transaction &transaction)
00089 {
00090 message::TransactionContext *ctx= transaction.mutable_transaction_context();
00091 ctx->set_end_timestamp(getNanoTimestamp());
00092 }
00093
00094 static void doCreateTable1(message::Transaction &transaction)
00095 {
00096 message::Statement *statement= transaction.add_statement();
00097
00098 statement->set_type(message::Statement::RAW_SQL);
00099 statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
00100 statement->set_start_timestamp(getNanoTimestamp());
00101 statement->set_end_timestamp(getNanoTimestamp());
00102 }
00103
00104 static void doCreateTable2(message::Transaction &transaction)
00105 {
00106 message::Statement *statement= transaction.add_statement();
00107
00108 statement->set_type(message::Statement::RAW_SQL);
00109 statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
00110 statement->set_start_timestamp(getNanoTimestamp());
00111 statement->set_end_timestamp(getNanoTimestamp());
00112 }
00113
00114 static void doCreateTable3(message::Transaction &transaction)
00115 {
00116 message::Statement *statement= transaction.add_statement();
00117
00118 statement->set_type(message::Statement::RAW_SQL);
00119 statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
00120 statement->set_start_timestamp(getNanoTimestamp());
00121 statement->set_end_timestamp(getNanoTimestamp());
00122 }
00123
00124 static void doSimpleInsert(message::Transaction &transaction)
00125 {
00126 message::Statement *statement= transaction.add_statement();
00127
00128
00129 statement->set_type(message::Statement::INSERT);
00130 statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
00131 statement->set_start_timestamp(getNanoTimestamp());
00132
00133
00134 message::InsertHeader *header= statement->mutable_insert_header();
00135
00136
00137 message::TableMetadata *t_meta= header->mutable_table_metadata();
00138 t_meta->set_schema_name("test");
00139 t_meta->set_table_name("t1");
00140
00141 message::FieldMetadata *f_meta= header->add_field_metadata();
00142 f_meta->set_name("a");
00143 f_meta->set_type(message::Table::Field::VARCHAR);
00144
00145
00146 message::InsertData *data= statement->mutable_insert_data();
00147 data->set_segment_id(1);
00148 data->set_end_segment(true);
00149
00150 message::InsertRecord *record1= data->add_record();
00151 message::InsertRecord *record2= data->add_record();
00152
00153 record1->add_insert_value("1");
00154 record2->add_insert_value("2");
00155
00156 statement->set_end_timestamp(getNanoTimestamp());
00157 }
00158
00159 static void doNonVarcharInsert(message::Transaction &transaction)
00160 {
00161 message::Statement *statement= transaction.add_statement();
00162
00163
00164 statement->set_type(message::Statement::INSERT);
00165 statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
00166 statement->set_start_timestamp(getNanoTimestamp());
00167
00168
00169 message::InsertHeader *header= statement->mutable_insert_header();
00170
00171
00172 message::TableMetadata *t_meta= header->mutable_table_metadata();
00173 t_meta->set_schema_name("test");
00174 t_meta->set_table_name("t2");
00175
00176 message::FieldMetadata *f_meta= header->add_field_metadata();
00177 f_meta->set_name("a");
00178 f_meta->set_type(message::Table::Field::INTEGER);
00179
00180
00181 message::InsertData *data= statement->mutable_insert_data();
00182 data->set_segment_id(1);
00183 data->set_end_segment(true);
00184
00185 message::InsertRecord *record1= data->add_record();
00186 message::InsertRecord *record2= data->add_record();
00187
00188 record1->add_insert_value("1");
00189 record2->add_insert_value("2");
00190
00191 statement->set_end_timestamp(getNanoTimestamp());
00192 }
00193
00194 static void doBlobInsert(message::Transaction &transaction)
00195 {
00196 message::Statement *statement= transaction.add_statement();
00197
00198
00199 statement->set_type(message::Statement::INSERT);
00200 statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43);
00201 statement->set_start_timestamp(getNanoTimestamp());
00202
00203
00204 message::InsertHeader *header= statement->mutable_insert_header();
00205
00206
00207 message::TableMetadata *t_meta= header->mutable_table_metadata();
00208 t_meta->set_schema_name("test");
00209 t_meta->set_table_name("t3");
00210
00211 message::FieldMetadata *f_meta= header->add_field_metadata();
00212 f_meta->set_name("a");
00213 f_meta->set_type(message::Table::Field::INTEGER);
00214
00215 f_meta= header->add_field_metadata();
00216 f_meta->set_name("b");
00217 f_meta->set_type(message::Table::Field::BLOB);
00218
00219
00220 message::InsertData *data= statement->mutable_insert_data();
00221 data->set_segment_id(1);
00222 data->set_end_segment(true);
00223
00224 message::InsertRecord *record1= data->add_record();
00225
00226 record1->add_insert_value("1");
00227 record1->add_insert_value("test\0me", 7);
00228
00229 statement->set_end_timestamp(getNanoTimestamp());
00230 }
00231
00232 static void doSimpleDelete(message::Transaction &transaction)
00233 {
00234 message::Statement *statement= transaction.add_statement();
00235
00236
00237 statement->set_type(message::Statement::DELETE);
00238 statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
00239 statement->set_start_timestamp(getNanoTimestamp());
00240
00241
00242 message::DeleteHeader *header= statement->mutable_delete_header();
00243
00244
00245 message::TableMetadata *t_meta= header->mutable_table_metadata();
00246 t_meta->set_schema_name("test");
00247 t_meta->set_table_name("t1");
00248
00249 message::FieldMetadata *f_meta= header->add_key_field_metadata();
00250 f_meta->set_name("a");
00251 f_meta->set_type(message::Table::Field::VARCHAR);
00252
00253
00254 message::DeleteData *data= statement->mutable_delete_data();
00255 data->set_segment_id(1);
00256 data->set_end_segment(true);
00257
00258 message::DeleteRecord *record1= data->add_record();
00259
00260 record1->add_key_value("1");
00261
00262 statement->set_end_timestamp(getNanoTimestamp());
00263 }
00264
00265 static void doSimpleUpdate(message::Transaction &transaction)
00266 {
00267 message::Statement *statement= transaction.add_statement();
00268
00269
00270 statement->set_type(message::Statement::UPDATE);
00271 statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
00272 statement->set_start_timestamp(getNanoTimestamp());
00273
00274
00275 message::UpdateHeader *header= statement->mutable_update_header();
00276
00277
00278 message::TableMetadata *t_meta= header->mutable_table_metadata();
00279 t_meta->set_schema_name("test");
00280 t_meta->set_table_name("t1");
00281
00282 message::FieldMetadata *kf_meta= header->add_key_field_metadata();
00283 kf_meta->set_name("a");
00284 kf_meta->set_type(message::Table::Field::VARCHAR);
00285
00286 message::FieldMetadata *sf_meta= header->add_set_field_metadata();
00287 sf_meta->set_name("a");
00288 sf_meta->set_type(message::Table::Field::VARCHAR);
00289
00290
00291 message::UpdateData *data= statement->mutable_update_data();
00292 data->set_segment_id(1);
00293 data->set_end_segment(true);
00294
00295 message::UpdateRecord *record1= data->add_record();
00296
00297 record1->add_after_value("5");
00298 record1->add_key_value("1");
00299
00300 statement->set_end_timestamp(getNanoTimestamp());
00301 }
00302
00303 static void doMultiKeyUpdate(message::Transaction &transaction)
00304 {
00305 message::Statement *statement= transaction.add_statement();
00306
00307
00308 statement->set_type(message::Statement::UPDATE);
00309 statement->set_sql("UPDATE t1 SET a = \"5\"");
00310 statement->set_start_timestamp(getNanoTimestamp());
00311
00312
00313 message::UpdateHeader *header= statement->mutable_update_header();
00314
00315
00316 message::TableMetadata *t_meta= header->mutable_table_metadata();
00317 t_meta->set_schema_name("test");
00318 t_meta->set_table_name("t1");
00319
00320 message::FieldMetadata *kf_meta= header->add_key_field_metadata();
00321 kf_meta->set_name("a");
00322 kf_meta->set_type(message::Table::Field::VARCHAR);
00323
00324 message::FieldMetadata *sf_meta= header->add_set_field_metadata();
00325 sf_meta->set_name("a");
00326 sf_meta->set_type(message::Table::Field::VARCHAR);
00327
00328
00329 message::UpdateData *data= statement->mutable_update_data();
00330 data->set_segment_id(1);
00331 data->set_end_segment(true);
00332
00333 message::UpdateRecord *record1= data->add_record();
00334 message::UpdateRecord *record2= data->add_record();
00335
00336 record1->add_after_value("5");
00337 record1->add_key_value("1");
00338 record2->add_after_value("5");
00339 record2->add_key_value("2");
00340
00341 statement->set_end_timestamp(getNanoTimestamp());
00342 }
00343
00344 static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
00345 {
00346 std::string buffer("");
00347 finalizeTransactionContext(transaction);
00348 transaction.SerializeToString(&buffer);
00349
00350 size_t length= buffer.length();
00351
00352 output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
00353 output->WriteLittleEndian32(static_cast<uint32_t>(length));
00354 output->WriteString(buffer);
00355 output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length));
00356 }
00357
00358 int main(int argc, char* argv[])
00359 {
00360 GOOGLE_PROTOBUF_VERIFY_VERSION;
00361 int file;
00362
00363 if (argc != 2)
00364 {
00365 fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
00366 return -1;
00367 }
00368
00369 if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
00370 {
00371 fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
00372 return -1;
00373 }
00374
00375 protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
00376 protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
00377
00378
00379 message::Transaction transaction;
00380
00381
00382 initTransactionContext(transaction);
00383 doCreateTable1(transaction);
00384 writeTransaction(coded_output, transaction);
00385 transaction.Clear();
00386
00387 initTransactionContext(transaction);
00388 doCreateTable2(transaction);
00389 writeTransaction(coded_output, transaction);
00390 transaction.Clear();
00391
00392
00393 initTransactionContext(transaction);
00394 doSimpleInsert(transaction);
00395 writeTransaction(coded_output, transaction);
00396 transaction.Clear();
00397
00398
00399 initTransactionContext(transaction);
00400 doSimpleDelete(transaction);
00401 doSimpleUpdate(transaction);
00402 writeTransaction(coded_output, transaction);
00403 transaction.Clear();
00404
00405
00406 initTransactionContext(transaction);
00407 doNonVarcharInsert(transaction);
00408 writeTransaction(coded_output, transaction);
00409 transaction.Clear();
00410
00411
00412 initTransactionContext(transaction);
00413 doMultiKeyUpdate(transaction);
00414 writeTransaction(coded_output, transaction);
00415 transaction.Clear();
00416
00417
00418 initTransactionContext(transaction);
00419 doCreateTable3(transaction);
00420 doBlobInsert(transaction);
00421 writeTransaction(coded_output, transaction);
00422 transaction.Clear();
00423
00424 delete coded_output;
00425 delete raw_output;
00426
00427 return 0;
00428 }