00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #ifdef UNIT_TEST
00037
00038 #include <stdlib.h>
00039 #include <stdio.h>
00040 #include <unistd.h>
00041 #include <string.h>
00042 #include <ctype.h>
00043 #include <inttypes.h>
00044
00045 #include "cslib/CSConfig.h"
00046 #include "cslib/CSGlobal.h"
00047 #include "cslib/CSThread.h"
00048 #include "cslib/CSStrUtil.h"
00049 #include "cslib/CSStorage.h"
00050
00051 #include "trans_cache_ms.h"
00052 #include "trans_log_ms.h"
00053
00054 #include "mysql.h"
00055
00056 #define CREATE_TABLE_BODY "\
00057 (\
00058 blob_ref INT NOT NULL AUTO_INCREMENT,\
00059 tab_id INT NOT NULL,\
00060 blob_id BIGINT NOT NULL, \
00061 committed BOOLEAN NOT NULL DEFAULT 0, \
00062 PRIMARY KEY (blob_ref, tab_id)\
00063 )\
00064 ENGINE = INNODB\
00065 "
00066 #ifdef LOG_TABLE
00067 #undef LOG_TABLE
00068 #endif
00069
00070 #define LOG_TABLE "translog"
00071 #define REF_TABLE "transref_%d"
00072 #define MAX_THREADS 20
00073
00074 #define A_DB_ID 123
00075
00076 #define TEST_DATABASE_NAME "TransTest"
00077 static const char *user_name = "root";
00078 static const char *user_passwd = "";
00079 static int port = 3306;
00080 static const char *host = "localhost";
00081 static int nap_time = 1000;
00082 static int max_transaction = 10;
00083 static bool dump_log = false, overflow_crash = false;
00084 static int crash_site = 0;
00085 static int num_threads = 1;
00086
00087 static time_t timeout = 60;
00088 static bool revover_only = false;
00089 static bool recreate = false;
00090
00091 static uint32_t cache_size = 0, log_size = 0;
00092
00093 static MSTrans *trans_log;
00094
00095 static CSThreadList *thread_list;
00096
00097 static MYSQL *new_connection(bool check_for_db);
00098
00099 static CSThread *main_thread;
00100
00101
00102 class TransTestThread : public CSDaemon {
00103 public:
00104 TransTestThread():
00105 CSDaemon(thread_list),
00106 count(0),
00107 myActivity(0),
00108 log(NULL),
00109 stopit(false),
00110 finished(false),
00111 mysql(NULL)
00112 {}
00113
00114 ~TransTestThread()
00115 {
00116 if (log)
00117 log->release();
00118
00119 if (mysql)
00120 mysql_close(mysql);
00121 }
00122
00123 MSTrans *log;
00124 MYSQL *mysql;
00125 uint32_t count;
00126 uint32_t myActivity;
00127
00128 bool stopit;
00129 bool finished;
00130
00131 virtual bool doWork() {return true;}
00132 };
00133
00134
00135 class TransTestWriterThread : public TransTestThread {
00136 public:
00137 TransTestWriterThread():TransTestThread() {}
00138
00139 uint32_t tab_id;
00140 FILE *myLog;
00141
00142 void generate_records();
00143 bool doWork()
00144 {
00145 generate_records();
00146 finished = true;
00147 return true;
00148 }
00149
00150 static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
00151 {
00152 TransTestWriterThread *tt;
00153 enter_();
00154
00155
00156 new_(tt, TransTestWriterThread());
00157
00158 char name[32];
00159 sprintf(name, "write_%d.log", id);
00160 if (recreate)
00161 tt->myLog = fopen(name, "w+");
00162 else {
00163 tt->myLog = fopen(name, "a+");
00164 fprintf(tt->myLog, "====================================================\n");
00165 }
00166
00167 tt->tab_id = id ;
00168 tt->mysql = new_connection(false);
00169 tt->log = trans_log;
00170 trans_log->retain();
00171
00172 return_(tt);
00173 }
00174
00175
00176 };
00177
00178
00179 class TransTestReaderThread : public TransTestThread {
00180 public:
00181 TransTestReaderThread():TransTestThread(){}
00182
00183 bool recovering;
00184 void processTransactionLog();
00185 bool doWork()
00186 {
00187 processTransactionLog();
00188 return true;
00189 }
00190
00191 static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
00192 {
00193 TransTestReaderThread *tt;
00194 enter_();
00195
00196 new_(tt, TransTestReaderThread());
00197 tt->mysql = new_connection(false);
00198 tt->log = log;
00199 tt->log->retain();
00200
00201 tt->log->txn_SetReader(tt);
00202 tt->recovering = false;
00203 return_(tt);
00204 }
00205
00206 bool rec_found(uint64_t id, uint32_t tab_id)
00207 {
00208 char stmt[100];
00209 MYSQL_RES *results = NULL;
00210 bool found;
00211
00212 sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id);
00213 if (mysql_query(mysql, stmt)) {
00214 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00215 printf("%s\n", stmt);
00216 exit(1);
00217 }
00218
00219
00220 results = mysql_store_result(mysql);
00221 if (!results){
00222 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00223 exit(1);
00224 }
00225
00226 found = (mysql_num_rows(results) == 1);
00227 mysql_free_result(results);
00228
00229 return found;
00230
00231 }
00232
00233
00234 };
00235
00236 TransTestReaderThread *TransReader;
00237
00238 static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
00239 {
00240 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
00241 if (msg)
00242 printf("%s\n", msg);
00243 exit(1);
00244 }
00245
00246
00247
00248 static MYSQL *new_connection(bool check_for_db)
00249 {
00250 MYSQL *mysql;
00251
00252 mysql = mysql_init(NULL);
00253 if (!mysql) {
00254 printf( "mysql_init() failed.\n");
00255 exit(1);
00256 }
00257
00258 if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
00259 report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
00260
00261 if (check_for_db) {
00262 MYSQL_RES *results = NULL;
00263
00264 if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
00265 report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
00266
00267 results = mysql_store_result(mysql);
00268 if (!results)
00269 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00270
00271
00272 if (mysql_num_rows(results) != 1) {
00273 if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
00274 report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
00275 }
00276 mysql_free_result(results);
00277 }
00278
00279 if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
00280 report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
00281
00282 return mysql;
00283 }
00284
00285
00286 static void init_database(MYSQL *mysql, int cnt)
00287 {
00288 char stmt[1024];
00289
00290 unlink("ms-trans-log.dat");
00291 mysql_query(mysql, "drop table if exists " LOG_TABLE ";");
00292
00293 if (mysql_query(mysql, "create table " LOG_TABLE CREATE_TABLE_BODY ";")){
00294 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00295 exit(1);
00296 }
00297
00298 while (cnt) {
00299 sprintf(stmt, "drop table if exists " REF_TABLE ";", cnt);
00300 mysql_query(mysql, stmt);
00301 sprintf(stmt, "create table " REF_TABLE CREATE_TABLE_BODY ";", cnt);
00302 if (mysql_query(mysql, stmt)){
00303 printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00304 exit(1);
00305 }
00306 cnt--;
00307 }
00308 }
00309
00310
00311
00312 static void display_help(const char *app)
00313 {
00314 printf("\nUsage:\n");
00315 printf("%s -help | -r [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>] [-t<num_threads>] [<timeout>]\n\n", app);
00316
00317 printf("-r: Test recovery after a crash or shutdown.\n");
00318 printf("-d: Dump the transaction log.\n");
00319 printf("-n: Recreate the tables and recovery log.\n");
00320 printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
00321 printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
00322
00323 printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
00324 exit(1);
00325 }
00326
00327
00328 static void process_args(int argc, const char * argv[])
00329 {
00330 if (argc < 2)
00331 return;
00332
00333 for (int i = 1; i < argc; ) {
00334 if ( argv[i][0] != '-') {
00335 timeout = atoi(argv[i]);
00336 i++;
00337 if ((i != argc) || !timeout)
00338 display_help(argv[0]);
00339 } else {
00340 switch (argv[i][1]) {
00341 case 'h':
00342 display_help(argv[0]);
00343 break;
00344
00345 case 'r':
00346 if (argc > 4 || argv[i][2])
00347 display_help(argv[0]);
00348 revover_only = true;
00349 i++;
00350 break;
00351
00352 case 'd':
00353 if (argc != 2 || argv[i][2])
00354 display_help(argv[0]);
00355 dump_log = true;
00356 i++;
00357 break;
00358
00359 case 'n':
00360 if (argv[i][2])
00361 display_help(argv[0]);
00362 recreate = true;
00363 i++;
00364 break;
00365
00366 case 'c':
00367 if (argv[i][2])
00368 display_help(argv[0]);
00369 i++;
00370 crash_site = atoi(argv[i]);
00371 if (crash_site == (MAX_CRASH_POINT + 1))
00372 overflow_crash = true;
00373 else if ((!crash_site) || (crash_site > MAX_CRASH_POINT))
00374 display_help(argv[0]);
00375 i++;
00376 break;
00377
00378 case 's': {
00379 uint32_t size;
00380
00381 size = atol(argv[i+1]);
00382 if (!size)
00383 display_help(argv[0]);
00384
00385 if (argv[i][2] == 'c')
00386 cache_size = size;
00387 else if (argv[i][2] == 'l')
00388 log_size = size;
00389 else
00390 display_help(argv[0]);
00391
00392 i+=2;
00393 }
00394 break;
00395
00396 case 't':
00397 if (argv[i][2])
00398 display_help(argv[0]);
00399 i++;
00400 num_threads = atoi(argv[i]);
00401 if (!num_threads)
00402 display_help(argv[0]);
00403 i++;
00404 break;
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414 default:
00415 display_help(argv[0]);
00416 }
00417
00418 }
00419 }
00420 }
00421
00422
00423 static void init_env()
00424 {
00425 cs_init_memory();
00426 CSThread::startUp();
00427 if (!(main_thread = CSThread::newCSThread())) {
00428 CSException::logOSError(CS_CONTEXT, ENOMEM);
00429 exit(1);
00430 }
00431
00432 CSThread::setSelf(main_thread);
00433
00434 enter_();
00435 try_(a) {
00436 trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", true);
00437 new_(thread_list, CSThreadList());
00438 }
00439 catch_(a) {
00440 self->logException();
00441 CSThread::shutDown();
00442 exit(1);
00443 }
00444 cont_(a);
00445
00446 }
00447
00448 static void deinit_env()
00449 {
00450 if (thread_list) {
00451 thread_list->release();
00452 thread_list = NULL;
00453 }
00454
00455 if (trans_log) {
00456 trans_log->release();
00457 trans_log = NULL;
00458 }
00459
00460 if (main_thread) {
00461 main_thread->release();
00462 main_thread = NULL;
00463 }
00464
00465 CSThread::shutDown();
00466 cs_exit_memory();
00467 }
00468
00469 static bool verify_database(MYSQL *mysql)
00470 {
00471 MYSQL_RES **r_results, *l_results = NULL;
00472 MYSQL_ROW r_record, l_record;
00473 bool ok = false;
00474 int i, log_row_cnt, ref_row_cnt = 0, tab_id;
00475 char stmt[1024];
00476
00477 r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
00478
00479 if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref"))
00480 report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
00481
00482 l_results = mysql_store_result(mysql);
00483 if (!l_results)
00484 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00485
00486 log_row_cnt = mysql_num_rows(l_results);
00487 mysql_free_result(l_results);
00488 if (log_row_cnt)
00489 printf("Uncommitted references: %d\n", log_row_cnt);
00490
00491
00492 for (i =0; i < num_threads; i++) {
00493 sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
00494 if (mysql_query(mysql, stmt))
00495 report_mysql_error(mysql, __LINE__, stmt);
00496
00497 r_results[i] = mysql_store_result(mysql);
00498 if (!r_results)
00499 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00500
00501 ref_row_cnt += mysql_num_rows(r_results[i]);
00502 }
00503
00504 if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref"))
00505 report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
00506
00507 l_results = mysql_store_result(mysql);
00508 if (!l_results)
00509 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00510
00511 log_row_cnt = mysql_num_rows(l_results);
00512
00513 if (log_row_cnt != ref_row_cnt) {
00514 if (ref_row_cnt > log_row_cnt) {
00515 printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt, ref_row_cnt);
00516 goto done;
00517 }
00518
00519 printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt, ref_row_cnt);
00520 printf("Possible unreferenced BLOBs\n");
00521 }
00522
00523 if (log_row_cnt == ref_row_cnt) {
00524 for ( i = 0; i < log_row_cnt; i++) {
00525 l_record = mysql_fetch_row(l_results);
00526 tab_id = atol(l_record[1]);
00527 r_record = mysql_fetch_row(r_results[tab_id-1]);
00528 if ((atol(l_record[0]) != atol(r_record[0])) ||
00529 (atol(l_record[1]) != atol(r_record[1])) ||
00530 (atol(l_record[2]) != atol(r_record[2]))) {
00531
00532 printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
00533 printf("field 1: %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
00534 printf("field 2: %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
00535 printf("field 3: %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
00536 goto done;
00537 }
00538
00539 }
00540 } else {
00541
00542 for (i =0; i < num_threads; i++) {
00543 mysql_free_result(r_results[i]);
00544
00545 sprintf(stmt, "select * from "REF_TABLE" where blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
00546 if (mysql_query(mysql, stmt))
00547 report_mysql_error(mysql, __LINE__, stmt);
00548
00549 r_results[i] = mysql_store_result(mysql);
00550 if (!r_results)
00551 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00552
00553 if (mysql_num_rows(r_results[i])) {
00554 printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
00555 goto done;
00556 }
00557 }
00558 }
00559
00560 printf("verify_database() OK.\n");
00561 ok = true;
00562
00563 done:
00564
00565 for (i =0; i < num_threads; i++) {
00566 mysql_free_result(r_results[i]);
00567 }
00568 free(r_results);
00569
00570 mysql_free_result(l_results);
00571
00572 #ifdef DEBUG
00573 if (!ok) {
00574 trans_log->txn_DumpLog("trace.log");
00575 }
00576 #endif
00577 return ok;
00578 }
00579
00580
00581 void TransTestReaderThread::processTransactionLog()
00582 {
00583 MSTransRec rec = {0};
00584 MS_TxnState state;
00585 char stmt[1024];
00586 uint32_t last_tid = 0;
00587 enter_();
00588
00589
00590
00591
00592 try_(a) {
00593 while (!myMustQuit && !stopit) {
00594
00595
00596 log->txn_GetNextTransaction(&rec, &state);
00597 if (myMustQuit)
00598 break;
00599
00600 myActivity++;
00601 #ifdef CHECK_TIDS
00602 if (num_threads == 1) {
00603 ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid == rec.tr_id) || !last_tid);
00604 last_tid = rec.tr_id;
00605 }
00606 #endif
00607 if (!recovering)
00608 count++;
00609
00610 switch (TRANS_TYPE(rec.tr_type)) {
00611 case MS_ReferenceTxn:
00612 case MS_DereferenceTxn:
00613 case MS_RollBackTxn:
00614 case MS_CommitTxn:
00615 case MS_RecoveredTxn:
00616 break;
00617 default:
00618 printf("Unexpected transaction type: %d\n", rec.tr_type);
00619 exit(1);
00620 }
00621
00622 if (state == MS_Committed){
00623
00624
00625 if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
00626 sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
00627 if (mysql_query(mysql, stmt))
00628 report_mysql_error(mysql, __LINE__, stmt);
00629 } else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
00630 sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
00631 if (mysql_query(mysql, stmt))
00632 report_mysql_error(mysql, __LINE__, stmt);
00633 }
00634 } else if (state == MS_RolledBack) {
00635
00636 if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
00637 sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
00638 if (mysql_query(mysql, stmt))
00639 report_mysql_error(mysql, __LINE__, stmt);
00640 }
00641 } else if (state == MS_Recovered) {
00642 printf("Recovered transaction being ignored:\n");
00643 printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
00644 } else {
00645 printf("Unexpected transaction state: %d\n", state);
00646 exit(1);
00647 }
00648
00649
00650 }
00651 }
00652 catch_(a) {
00653 self->logException();
00654 printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
00655 if (!myMustQuit && !stopit)
00656 exit(1);
00657 }
00658 cont_(a);
00659 printf("The transaction log reader shutting down.\n");
00660 exit_();
00661 }
00662
00663
00664 void TransTestWriterThread::generate_records()
00665 {
00666
00667 MS_Txn txn_type;
00668 uint64_t blob_id;
00669 uint64_t blob_ref_id;
00670 int tsize, i;
00671 bool do_delete;
00672
00673 char stmt[1024];
00674 enter_();
00675
00676 try_(a) {
00677 while (!myMustQuit && !stopit) {
00678
00679 myActivity++;
00680 usleep(nap_time);
00681 if (myMustQuit || stopit)
00682 break;
00683
00684 tsize = rand() % max_transaction;
00685
00686 if (mysql_autocommit(mysql, 0))
00687 report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
00688
00689 i = 0;
00690 do {
00691 do_delete = ((rand() %2) == 0);
00692
00693
00694 if (do_delete) {
00695 MYSQL_RES *results = NULL;
00696 MYSQL_ROW record;
00697 int cnt;
00698
00699
00700
00701
00702 txn_type = MS_DereferenceTxn;
00703
00704 sprintf(stmt, "select * from "REF_TABLE, tab_id);
00705 if (mysql_query(mysql, stmt))
00706 report_mysql_error(mysql, __LINE__, stmt);
00707
00708 results = mysql_store_result(mysql);
00709 if (!results)
00710 report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00711
00712 cnt = mysql_num_rows(results);
00713 if (!cnt)
00714 do_delete = false;
00715 else {
00716 mysql_data_seek(results, rand()%cnt);
00717 record = mysql_fetch_row(results);
00718
00719 blob_ref_id = atol(record[0]);
00720 blob_id = atol(record[2]);
00721
00722 sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id);
00723 if (mysql_query(mysql, stmt))
00724 report_mysql_error(mysql, __LINE__, stmt);
00725
00726 if (mysql_affected_rows(mysql) == 0)
00727 do_delete = false;
00728 else
00729 fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);
00730 }
00731
00732 mysql_free_result(results);
00733 }
00734
00735 if (!do_delete) {
00736 blob_id = self->myTID;
00737 txn_type = MS_ReferenceTxn;
00738
00739 sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id);
00740 if (mysql_query(mysql, stmt))
00741 report_mysql_error(mysql, __LINE__, stmt);
00742
00743 blob_ref_id = mysql_insert_id(mysql);
00744 if (!blob_ref_id)
00745 report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
00746
00747 fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id);
00748
00749 sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id);
00750 if (mysql_query(mysql, stmt))
00751 report_mysql_error(mysql, __LINE__, stmt);
00752 }
00753
00754 i++;
00755 count++;
00756 if (i >= tsize) {
00757 bool rollback;
00758
00759 rollback = ((tsize > 0) && ((rand() % 1000) == 0));
00760 if (rollback) {
00761 printf("Rollback\n");
00762 if (mysql_rollback(mysql))
00763 report_mysql_error(mysql, __LINE__, "mysql_rollback()");
00764 fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);
00765 log->txn_LogTransaction(MS_RollBackTxn);
00766 } else {
00767 if (mysql_commit(mysql))
00768 report_mysql_error(mysql, __LINE__, "mysql_commit()");
00769 fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);
00770 log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
00771 }
00772 } else
00773 log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
00774
00775 } while ( i < tsize);
00776
00777 }
00778 }
00779
00780 catch_(a) {
00781 self->logException();
00782 printf("\n\nA writer thread for table %d died! \n\n", tab_id);
00783 if (i == tsize) {
00784 printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
00785 }
00786 if (!myMustQuit && !stopit)
00787 exit(1);
00788 }
00789 cont_(a);
00790 printf("Writer thread for table %d is shutting down.\n", tab_id);
00791 exit_();
00792 }
00793
00794
00795
00796
00797
00798
00799 int main (int argc, const char * argv[])
00800 {
00801 MYSQL *mysql;
00802 TransTestWriterThread **writer = NULL;
00803 int rtc = 1;
00804
00805 process_args(argc, argv);
00806
00807 mysql = new_connection(true);
00808
00809 if (recreate)
00810 init_database(mysql, num_threads);
00811
00812 init_env();
00813 enter_();
00814
00815 if (dump_log) {
00816 printf("LOG dumped\n");
00817 exit(1);
00818 }
00819
00820 TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
00821 push_(TransReader);
00822 TransReader->recovering = true;
00823 TransReader->start();
00824
00825
00826 while (trans_log->txn_GetNumRecords())
00827 usleep(100);
00828
00829 TransReader->recovering = false;
00830
00831 if (log_size)
00832 trans_log->txn_SetLogSize(log_size);
00833
00834 if (cache_size)
00835 trans_log->txn_SetCacheSize(cache_size);
00836
00837 if (revover_only) {
00838 TransReader->stopit = true;
00839 if (verify_database(mysql))
00840 rtc = 0;
00841 goto done;
00842 }
00843
00844 try_(a) {
00845 writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
00846 for (int i = 0; i < num_threads; i++) {
00847 TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
00848 wt->start();
00849 writer[i] = wt;
00850 }
00851
00852 printf("Timeout: %d seconds\n", timeout);
00853 timeout += time(NULL);
00854 int header = 0;
00855 while (timeout > time(NULL)) {
00856 MSTransStatsRec stats;
00857 self->sleep(1000);
00858 trans_log->txn_GetStats(&stats);
00859
00860
00861 if (!(header%20)) {
00862 for (int i = 0; i < num_threads; i++) {
00863 if (writer[i]->myActivity == 0) {
00864 printf("Writer thread %d HUNG!!!\n", i);
00865 }
00866 writer[i]->myActivity = 0;
00867 }
00868
00869 if (TransReader->myActivity == 0) {
00870 printf("Reader thread HUNG!!!\n");
00871 }
00872 TransReader->myActivity = 0;
00873
00874 printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
00875 }
00876 header++;
00877
00878 printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",
00879 stats.ts_LogSize,
00880 stats.ts_PercentFull,
00881 stats.ts_MaxSize,
00882 stats.ts_OverflowCount,
00883 (stats.ts_IsOverflowing)?"Over Flow": " --- ",
00884 stats.ts_TransCacheSize,
00885 stats.ts_PercentTransCacheUsed,
00886 stats.ts_PercentCacheHit
00887 );
00888
00889 if (stats.ts_IsOverflowing && overflow_crash) {
00890 printf("Simulating crash while in overflow\n");
00891 exit(1);
00892 }
00893 }
00894
00895 #ifdef CRASH_TEST
00896 if (crash_site) {
00897 printf("Crashing at crash site %d\n", crash_site);
00898 trans_test_crash_point = crash_site;
00899
00900 while(1)
00901 self->sleep(1000);
00902 }
00903 #endif
00904
00905 printf("Shutting down the writer threads:\n");
00906 for (int i = 0; i < num_threads; i++) {
00907 writer[i]->stopit = true;
00908 }
00909
00910 TransReader->stopit = true;
00911
00912 int cnt = 100;
00913 while (cnt) {
00914 int i;
00915 for (i = 0; i < num_threads && writer[i]->finished; i++);
00916 if (i == num_threads && TransReader->finished)
00917 break;
00918 self->sleep(10);
00919 cnt--;
00920 }
00921
00922 for (int i = 0; i < num_threads; i++) {
00923 writer[i]->stop();
00924 }
00925
00926 }
00927 rtc = 0;
00928 catch_(a) {
00929 printf("Main thread abort.\n");
00930 self->logException();
00931 }
00932 cont_(a);
00933 if (writer) {
00934 for (int i = 0; i < num_threads; i++) {
00935 writer[i]->stop();
00936 writer[i]->release();
00937 }
00938 cs_free(writer);
00939 }
00940
00941 done:
00942 TransReader->stop();
00943 release_(TransReader);
00944
00945 outer_();
00946
00947 thread_list->stopAllThreads();
00948 deinit_env();
00949 mysql_close(mysql);
00950 exit(rtc);
00951 }
00952
00953 #endif // UNIT_TEST