00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "cslib/CSConfig.h"
00031
00032 #include <inttypes.h>
00033
00034 #include "defs_ms.h"
00035
00036 #include "cslib/CSGlobal.h"
00037 #include "cslib/CSStrUtil.h"
00038 #include "cslib/CSLog.h"
00039
00040 #include "mysql_ms.h"
00041 #include "open_table_ms.h"
00042 #include "trans_log_ms.h"
00043 #include "transaction_ms.h"
00044 #include "pbmsdaemon_ms.h"
00045
00046
00047
00048
00049 void pbms_take_part_in_transaction(void *thread);
00050
00051 MSTrans *MSTransactionManager::tm_Log;
00052 MSTransactionThread *MSTransactionManager::tm_Reader;
00053
00054
00055 typedef struct {
00056 CSDiskValue4 lr_time_4;
00057 CSDiskValue1 lr_state_1;
00058 CSDiskValue1 lr_type_1;
00059 CSDiskValue4 lr_db_id_4;
00060 CSDiskValue4 lr_tab_id_4;
00061 CSDiskValue8 lr_blob_id_8;
00062 CSDiskValue8 lr_blob_ref_id_8;
00063 } MSDiskLostRec, *MSDiskLostPtr;
00064
00065
00066
00067
00068
00069
00070 class MSTransactionThread : public CSDaemon {
00071 public:
00072 MSTransactionThread(MSTrans *txn_log);
00073
00074 virtual ~MSTransactionThread(){}
00075
00076
00077 void close();
00078
00079 virtual bool doWork();
00080
00081 virtual void *completeWork();
00082
00083 void flush();
00084
00085 bool trt_is_ready;
00086 private:
00087 void reportLostReference(MSTransPtr rec, MS_TxnState state);
00088 void dereference(MSTransPtr rec, MS_TxnState state);
00089 void commitReference(MSTransPtr rec, MS_TxnState state);
00090
00091 MSTrans *trt_log;
00092 CSFile *trt_lostLog;
00093
00094 };
00095
00096 MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
00097 CSDaemon(0, NULL),
00098 trt_is_ready(false),
00099 trt_log(txn_log),
00100 trt_lostLog(NULL)
00101 {
00102 trt_log->txn_SetReader(this);
00103 }
00104
00105 void MSTransactionThread::close()
00106 {
00107 if (trt_lostLog)
00108 trt_lostLog->close();
00109 }
00110
00111 void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
00112 {
00113 MSDiskLostRec lrec;
00114 const char *t_txt, *s_txt;
00115 char b1[16], b2[16], msg[100];
00116 MSDatabase *db;
00117 MSTable *tab;
00118
00119
00120
00121
00122 enter_();
00123
00124
00125
00126
00127 db = MSDatabase::getDatabase(rec->tr_db_id, true);
00128 if (!db)
00129 goto dont_worry_about_it;
00130
00131 push_(db);
00132 tab = db->getTable(rec->tr_tab_id, true);
00133 release_(db);
00134 if (!tab)
00135 goto dont_worry_about_it;
00136 tab->release();
00137
00138 switch (state) {
00139 case MS_Committed:
00140 s_txt = "Commit";
00141 break;
00142 case MS_RolledBack:
00143 s_txt = "RolledBack";
00144 break;
00145 case MS_Recovered:
00146 s_txt = "Recovered";
00147 break;
00148 case MS_Running:
00149 s_txt = "Running";
00150 break;
00151 default:
00152 snprintf(b1, 16, "(%d)?", state);
00153 s_txt = b1;
00154 }
00155
00156 switch (TRANS_TYPE(rec->tr_type)) {
00157 case MS_DereferenceTxn:
00158 t_txt = "Dereference";
00159 break;
00160 case MS_ReferenceTxn:
00161 t_txt = "Reference";
00162 break;
00163 default:
00164 snprintf(b2, 16, "(%x)?", rec->tr_type);
00165 t_txt = b2;
00166 }
00167
00168 snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
00169 CSL.logLine(self, CSLog::Warning, msg);
00170
00171 CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
00172 CS_SET_DISK_1(lrec.lr_state_1, state);
00173 CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
00174 CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
00175 CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
00176 CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
00177 CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
00178
00179 if (!trt_lostLog) {
00180 CSPath *path;
00181 char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
00182 cs_remove_last_name_of_path(str);
00183
00184 path = CSPath::newPath(str, "pbms_lost_txn.dat");
00185 cs_free(str);
00186
00187 trt_lostLog = CSFile::newFile(path);
00188 trt_lostLog->open(CSFile::CREATE);
00189 }
00190 trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
00191 trt_lostLog->sync();
00192
00193 dont_worry_about_it:
00194 exit_();
00195
00196 }
00197
00198 void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
00199 {
00200 enter_();
00201
00202 try_(a) {
00203 MSOpenTable *otab;
00204 otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
00205 frompool_(otab);
00206 otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
00207 backtopool_(otab);
00208 }
00209
00210 catch_(a) {
00211 reportLostReference(rec, state);
00212 }
00213
00214 cont_(a);
00215 exit_();
00216 }
00217
00218 void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
00219 {
00220 enter_();
00221
00222 try_(a) {
00223 MSOpenTable *otab;
00224 otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
00225 frompool_(otab);
00226 otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
00227 backtopool_(otab);
00228 }
00229
00230 catch_(a) {
00231 reportLostReference(rec, state);
00232 }
00233
00234 cont_(a);
00235 exit_();
00236 }
00237
00238 void MSTransactionThread::flush()
00239 {
00240 enter_();
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 wakeup();
00251 while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
00252 self->sleep(10);
00253 exit_();
00254 }
00255
00256 bool MSTransactionThread::doWork()
00257 {
00258 enter_();
00259
00260 try_(a) {
00261 MSTransRec rec = {0,0,0,0,0,0,0};
00262 MS_TxnState state;
00263 while (!myMustQuit) {
00264
00265
00266 trt_log->txn_GetNextTransaction(&rec, &state);
00267 if (myMustQuit)
00268 break;
00269
00270 if (rec.tr_db_id == 0)
00271 continue;
00272
00273 if (state == MS_Committed){
00274 if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn)
00275 dereference(&rec, state);
00276 else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
00277 commitReference(&rec, state);
00278
00279 } else if (state == MS_RolledBack) {
00280
00281
00282 if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
00283 dereference(&rec, state);
00284
00285 } else if (state == MS_Recovered) {
00286 if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
00287 reportLostReference(&rec, state);
00288
00289
00290
00291
00292
00293
00294
00295 if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)
00296 commitReference(&rec, state);
00297
00298 }
00299 }
00300 }
00301
00302 catch_(a) {
00303 self->logException();
00304 CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
00305 }
00306 cont_(a);
00307 return_(true);
00308 }
00309
00310 void *MSTransactionThread::completeWork()
00311 {
00312 close();
00313
00314 if (trt_log)
00315 trt_log->release();
00316
00317 if (trt_lostLog)
00318 trt_lostLog->release();
00319 return NULL;
00320 }
00321
00322
00323
00324
00325
00326 void MSTransactionManager::startUpReader()
00327 {
00328 char pbms_path[PATH_MAX];
00329 enter_();
00330
00331 cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir());
00332 cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
00333
00334 tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
00335 new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
00336
00337 tm_Reader->start();
00338
00339
00340 tm_Reader->flush();
00341
00342 exit_();
00343 }
00344
00345 void MSTransactionManager::startUp()
00346 {
00347 CSPath *path = NULL;
00348 enter_();
00349
00350
00351 path = CSPath::newPath(PBMSDaemon::getPBMSDir());
00352 push_(path);
00353 if (path->exists()) {
00354 startUpReader();
00355 }
00356 release_(path);
00357
00358 exit_();
00359 }
00360
00361 void MSTransactionManager::shutDown()
00362 {
00363 if (tm_Reader) {
00364 tm_Reader->stop();
00365 tm_Reader->release();
00366 tm_Reader = NULL;
00367 }
00368 if (tm_Log) {
00369 tm_Log->release();
00370 tm_Log = NULL;
00371 }
00372 }
00373
00374 void MSTransactionManager::flush()
00375 {
00376 if (tm_Reader)
00377 tm_Reader->flush();
00378 }
00379
00380 void MSTransactionManager::suspend(bool do_flush)
00381 {
00382 enter_();
00383
00384 if (do_flush)
00385 flush();
00386
00387 if (tm_Reader) {
00388 tm_Reader->suspend();
00389 }
00390 exit_();
00391 }
00392
00393 void MSTransactionManager::resume()
00394 {
00395 enter_();
00396 if (tm_Reader) {
00397 tm_Reader->resume();
00398 }
00399 exit_();
00400 }
00401
00402 void MSTransactionManager::commit()
00403 {
00404 enter_();
00405
00406 if (!tm_Log)
00407 startUpReader();
00408
00409 self->myStmtCount = 0;
00410 self->myStartStmt = 0;
00411
00412 tm_Log->txn_LogTransaction(MS_CommitTxn);
00413
00414
00415 exit_();
00416 }
00417
00418 void MSTransactionManager::rollback()
00419 {
00420 enter_();
00421
00422 if (!tm_Log)
00423 startUpReader();
00424
00425 self->myStmtCount = 0;
00426 self->myStartStmt = 0;
00427
00428 tm_Log->txn_LogTransaction(MS_RollBackTxn);
00429
00430 exit_();
00431 }
00432
00433 class MSTransactionCheckPoint: public CSString
00434 {
00435 public:
00436 MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSString(name)
00437 {
00438 position = stmtCount;
00439 }
00440
00441 uint32_t position;
00442 };
00443
00444 #ifdef DRIZZLED
00445 void MSTransactionManager::setSavepoint(const char *savePoint)
00446 {
00447 MSTransactionCheckPoint *checkPoint;
00448 enter_();
00449
00450 new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
00451
00452 push_(checkPoint);
00453 self->mySavePoints.add(checkPoint);
00454 pop_(checkPoint);
00455
00456 exit_();
00457 }
00458
00459 void MSTransactionManager::releaseSavepoint(const char *savePoint)
00460 {
00461 MSTransactionCheckPoint *checkPoint;
00462 CSString *name;
00463 enter_();
00464
00465 name = CSString::newString(savePoint);
00466 push_(name);
00467
00468 checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
00469 release_(name);
00470
00471 if (checkPoint)
00472 self->mySavePoints.remove(checkPoint);
00473
00474 exit_();
00475 }
00476
00477 void MSTransactionManager::rollbackTo(const char *savePoint)
00478 {
00479 MSTransactionCheckPoint *checkPoint;
00480 CSString *name;
00481 enter_();
00482
00483 name = CSString::newString(savePoint);
00484 push_(name);
00485
00486 checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
00487 release_(name);
00488
00489 if (checkPoint) {
00490 uint32_t position = checkPoint->position;
00491
00492 self->mySavePoints.remove(checkPoint);
00493 rollbackToPosition(position);
00494 }
00495
00496 exit_();
00497 }
00498 #endif
00499
00500 void MSTransactionManager::rollbackToPosition(uint32_t position)
00501 {
00502 enter_();
00503
00504 ASSERT(self->myStmtCount > position);
00505
00506 if (!tm_Log)
00507 startUpReader();
00508 tm_Log->txn_LogPartialRollBack(position);
00509
00510 exit_();
00511 }
00512
00513 void MSTransactionManager::dropDatabase(uint32_t db_id)
00514 {
00515 enter_();
00516
00517 if (!tm_Log)
00518 startUpReader();
00519
00520 tm_Log->txn_dropDatabase(db_id);
00521
00522 exit_();
00523 }
00524
00525 void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
00526 {
00527 enter_();
00528
00529 if (!tm_Log)
00530 startUpReader();
00531
00532 if (!self->myTID) {
00533 bool autocommit = false;
00534 autocommit = ms_is_autocommit();
00535 #ifndef DRIZZLED
00536 if (!autocommit)
00537 pbms_take_part_in_transaction(ms_my_get_thread());
00538 #endif
00539
00540 self->myIsAutoCommit = autocommit;
00541 }
00542
00543
00544 tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false , db_id, tab_id, blob_id, blob_ref_id);
00545
00546 self->myStmtCount++;
00547
00548 exit_();
00549 }
00550
00551