00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "cslib/CSConfig.h"
00030 #include <inttypes.h>
00031
00032 #include "cslib/CSGlobal.h"
00033
00034 #include "trans_cache_ms.h"
00035
00036 #define LIST_INC_SIZE 256 // If the list starts to grow it is probably because a backup is in progress so it could get quite large.
00037 #define MIN_LIST_SIZE 32 // A list size that should be able to handle a normal transaction load.
00038 #define MIN_CACHE_RECORDS 2
00039
00040 typedef struct myTrans {
00041 uint8_t tc_type;
00042 uint32_t tc_db_id;
00043 uint32_t tc_tab_id;
00044 bool tc_rolled_back;
00045 uint64_t tc_blob_id;
00046 uint64_t tc_blob_ref_id;
00047 uint64_t tc_position;
00048 } myTransRec, *myTransPtr;
00049
00050 #define BAD_LOG_POSITION ((uint64_t) -1)
00051 typedef struct TransList {
00052 #ifdef DEBUG
00053 uint32_t old_tid;
00054 #endif
00055 uint32_t tid;
00056 uint64_t log_position;
00057 MS_TxnState terminated;
00058 size_t size;
00059 size_t len;
00060 myTransPtr list;
00061 } TransListRec, *TransListPtr;
00062
00063 MSTransCache::MSTransCache(): CSSharedRefObject(),
00064 tc_List(NULL),
00065 tc_OverFlow(NULL),
00066 tc_Size(0),
00067 tc_EOL(0),
00068 tc_First(0),
00069 tc_Used(0),
00070 tc_TotalTransCount(0),
00071 tc_TotalCacheCount(0),
00072 tc_ReLoadingThread(NULL),
00073 tc_OverFlowTID(0),
00074 tc_Full(false),
00075 tc_CacheVersion(0),
00076 tc_Recovering(false)
00077 {}
00078
00079 MSTransCache::~MSTransCache()
00080 {
00081 if (tc_List) {
00082 for (uint32_t i = 0; i < tc_Size; i++) {
00083 if (tc_List[i].list)
00084 cs_free(tc_List[i].list);
00085 }
00086 cs_free(tc_List);
00087 }
00088 }
00089
00090 MSTransCache *MSTransCache::newMSTransCache(uint32_t min_size)
00091 {
00092 MSTransCache *tl = NULL;
00093 enter_();
00094
00095 new_(tl, MSTransCache());
00096 push_(tl);
00097
00098 if (MIN_LIST_SIZE > min_size)
00099 min_size = MIN_LIST_SIZE;
00100
00101 tl->tc_Initialize(min_size);
00102
00103 pop_(tl);
00104
00105 return_(tl);
00106 }
00107
00108
00109 void MSTransCache::tc_Initialize(uint32_t size)
00110 {
00111 enter_();
00112
00113 tc_Size = size;
00114 size++;
00115 tc_List = (TransListPtr) cs_malloc(size * sizeof(TransListRec));
00116
00117
00118 for (uint32_t i = 0; i < tc_Size; i++) {
00119 tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
00120 tc_List[i].size = MIN_CACHE_RECORDS;
00121 tc_List[i].len = 0;
00122 tc_List[i].tid = 0;
00123 tc_List[i].log_position = 0;
00124 tc_List[i].terminated = MS_Running;
00125 }
00126
00127 tc_OverFlow = tc_List + tc_Size;
00128
00129 tc_OverFlow->list = NULL;
00130 tc_OverFlow->size = 0;
00131 tc_OverFlow->len = 0;
00132 tc_OverFlow->tid = 0;
00133 tc_OverFlow->log_position = 0;
00134 tc_OverFlow->terminated = MS_Running;
00135 exit_();
00136 }
00137
00138
00139 void MSTransCache::tc_SetSize(uint32_t cache_size)
00140 {
00141 enter_();
00142
00143 lock_(this);
00144
00145 if (cache_size < MIN_LIST_SIZE)
00146 cache_size = MIN_LIST_SIZE;
00147
00148
00149
00150 for (uint32_t i = cache_size +1; i < tc_Size; i++) {
00151 if (tc_List[i].list)
00152 cs_free(tc_List[i].list);
00153 }
00154
00155
00156 cs_realloc((void **) &tc_List, (cache_size +1) * sizeof(TransListRec));
00157
00158 if (cache_size > tc_Size) {
00159
00160 memcpy(tc_List + cache_size, tc_List + tc_Size, sizeof(TransListRec));
00161
00162 for (uint32_t i = tc_Size; i < cache_size; i++) {
00163 tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
00164 tc_List[i].size = MIN_CACHE_RECORDS;
00165 tc_List[i].len = 0;
00166 tc_List[i].tid = 0;
00167 tc_List[i].log_position = 0;
00168 tc_List[i].terminated = MS_Running;
00169 }
00170
00171 }
00172
00173
00174 tc_Size = cache_size;
00175 tc_OverFlow = tc_List + tc_Size;
00176
00177 unlock_(this);
00178
00179 exit_();
00180 }
00181
00182 bool MSTransCache::tc_ShoulReloadCache()
00183 {
00184 return (((tc_Used +1) < tc_Size) && tc_Full);
00185 }
00186
00187 uint64_t MSTransCache::tc_StartCacheReload(bool startup)
00188 {
00189 enter_();
00190
00191 (void) startup;
00192
00193 ASSERT((startup) || tc_Full);
00194 tc_ReLoadingThread = self;
00195 tc_OverFlowTID = tc_OverFlow->tid;
00196
00197 self->myTID = 0;
00198 self->myTransRef = 0;
00199 #ifdef DEBUG
00200 tc_ReloadCnt =0;
00201 #endif
00202
00203 return_(tc_OverFlow->log_position);
00204 }
00205
00206 bool MSTransCache::tc_ContinueCacheReload()
00207 {
00208
00209
00210
00211
00212
00213
00214 return ((tc_List[tc_First].terminated == MS_Running) ||
00215 (tc_OverFlow->tid == tc_OverFlowTID) ||
00216 (tc_OverFlow->terminated == MS_Running)
00217 );
00218 }
00219
00220
00221 void MSTransCache::tc_CompleteCacheReload()
00222 {
00223 enter_();
00224
00225 tc_ReLoadingThread = NULL;
00226 if (tc_OverFlowTID) {
00227 tc_OverFlow->tid = 0;
00228 tc_OverFlowTID = 0;
00229 tc_Full = false;
00230 }
00231
00232 exit_();
00233 }
00234
00235 #define OVERFLOW_TREF (tc_Size)
00236 #define MAX_TREF (OVERFLOW_TREF +1)
00237
00238
00239
00240 TRef MSTransCache::tc_NewTransaction(uint32_t tid)
00241 {
00242 TRef ref;
00243 enter_();
00244
00245 ASSERT(tid);
00246
00247 if (self != tc_ReLoadingThread) {
00248 tc_TotalTransCount++;
00249 }
00250
00251
00252
00253
00254
00255
00256
00257
00258 if (tc_Full) {
00259 if (tc_ReLoadingThread != self) {
00260 ref = MAX_TREF;
00261 goto done;
00262 }
00263
00264 #ifdef DEBUG
00265 if (!tc_ReloadCnt) {
00266 ASSERT(tc_OverFlow->tid == tid);
00267 }
00268 tc_ReloadCnt++;
00269 #endif
00270 if (tid == tc_OverFlowTID) {
00271 #ifdef DEBUG
00272 tc_OverFlow->old_tid = tid;
00273 #endif
00274 tc_OverFlow->tid = 0;
00275 tc_OverFlow->terminated = MS_Running;
00276 ASSERT((tc_Used +1) < tc_Size);
00277 } else if (tc_OverFlowTID == 0) {
00278
00279
00280 ref = MAX_TREF;
00281 goto done;
00282 }
00283 }
00284
00285 if ((tc_Used +1) == tc_Size){
00286
00287 tc_OverFlowTID = 0;
00288 tc_OverFlow->tid = tid;
00289 tc_OverFlow->log_position = BAD_LOG_POSITION;
00290 tc_OverFlow->len = 0;
00291 tc_OverFlow->terminated = MS_Running;
00292 ref = OVERFLOW_TREF;
00293 tc_Full = true;
00294 #ifdef DEBUG
00295 tc_ReloadCnt++;
00296 #endif
00297
00298 goto done;
00299 }
00300
00301 if (self != tc_ReLoadingThread) {
00302 tc_TotalCacheCount++;
00303 }
00304
00305 ref = tc_EOL;
00306
00307 #ifdef CHECK_TIDS
00308 {
00309 static uint32_t last_tid = 0;
00310 static bool last_state = false;
00311 if (tc_Recovering != last_state)
00312 last_tid = 0;
00313
00314 last_state = tc_Recovering;
00315 if (!( ((last_tid + 1) == tid) || !last_tid))
00316 printf("Expected tid %"PRIu32"\n", last_tid + 1);
00317 ASSERT( ((last_tid + 1) == tid) || !last_tid);
00318 last_tid = tid;
00319 }
00320 #endif
00321
00322 tc_List[ref].tid = tid;
00323 tc_List[ref].len = 0;
00324 tc_List[ref].log_position = BAD_LOG_POSITION;
00325 tc_List[ref].terminated = MS_Running;
00326
00327
00328
00329 tc_Used++;
00330 tc_EOL++;
00331
00332 if (tc_EOL == tc_Size)
00333 tc_EOL = 0;
00334
00335 done:
00336 self->myTID = tid;
00337 self->myTransRef = ref;
00338 self->myCacheVersion = tc_CacheVersion;
00339 return_(ref);
00340 }
00341
00342 void MSTransCache::tc_FindTXNRef(uint32_t tid, TRef *tref)
00343 {
00344 uint32_t i = tc_First;
00345 enter_();
00346
00347
00348 if (tc_First > tc_EOL) {
00349 for (; i < OVERFLOW_TREF && *tref >= MAX_TREF; i++) {
00350 if (tc_List[i].tid == tid)
00351 *tref = i;
00352 }
00353 i = 0;
00354 }
00355
00356 for (; i < tc_EOL && *tref >= MAX_TREF; i++) {
00357 if (tc_List[i].tid == tid)
00358 *tref = i;
00359 }
00360
00361
00362
00363
00364 if ((*tref >= MAX_TREF) && (tid == tc_OverFlow->tid) && (tid != tc_OverFlowTID))
00365 *tref = OVERFLOW_TREF;
00366
00367 self->myTID = tid;
00368 self->myTransRef = *tref;
00369 self->myCacheVersion = tc_CacheVersion;
00370 exit_();
00371 }
00372
00373
00374
00375
00376 void MSTransCache::tc_AddRec(uint64_t log_position, MSTransPtr rec, TRef tref)
00377 {
00378 TransListPtr lrec;
00379 enter_();
00380
00381 lock_(this);
00382
00383
00384 if (tref == TRANS_CACHE_UNKNOWN_REF) {
00385 ASSERT(tc_ReLoadingThread == self);
00386
00387 if ((self->myTID == rec->tr_id) && (self->myTransRef != TRANS_CACHE_UNKNOWN_REF))
00388 tref = self->myTransRef;
00389 else {
00390 tc_FindTXNRef(rec->tr_id, &tref);
00391 if (tref == TRANS_CACHE_UNKNOWN_REF) {
00392 if (!TRANS_IS_START(rec->tr_type))
00393 goto done;
00394
00395 tref = tc_NewTransaction(rec->tr_id);
00396 }
00397 }
00398 }
00399
00400 ASSERT((tref <= MAX_TREF) || (tref == TRANS_CACHE_NEW_REF));
00401 ASSERT(self->myTID == rec->tr_id);
00402
00403
00404 if (tref >= OVERFLOW_TREF) {
00405 if (tref == TRANS_CACHE_NEW_REF) {
00406 ASSERT(TRANS_IS_START(rec->tr_type));
00407 tref = tc_NewTransaction(rec->tr_id);
00408 } else if (self->myCacheVersion != tc_CacheVersion) {
00409
00410 tc_FindTXNRef(rec->tr_id, &tref);
00411 }
00412
00413 if (tref >= OVERFLOW_TREF){
00414 if (tref == OVERFLOW_TREF) {
00415 if (!tc_OverFlow->len)
00416 tc_OverFlow->log_position = log_position;
00417
00418 tc_OverFlow->len++;
00419 if (TRANS_IS_TERMINATED(rec->tr_type)) {
00420 if (rec->tr_type == MS_RollBackTxn)
00421 tc_OverFlow->terminated = MS_RolledBack;
00422 else if (rec->tr_type == MS_RecoveredTxn)
00423 tc_OverFlow->terminated = MS_Recovered;
00424 else
00425 tc_OverFlow->terminated = MS_Committed;
00426 }
00427 }
00428
00429 goto done;
00430 }
00431 }
00432
00433 lrec = tc_List + tref;
00434
00435 ASSERT(lrec->tid);
00436 ASSERT(lrec->tid == rec->tr_id);
00437
00438 if (!lrec->len) {
00439 lrec->log_position = log_position;
00440 } else if (( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn)) && !tc_Recovering) {
00441
00442
00443 for (uint32_t i = 0; i < lrec->len; i++) {
00444 if (lrec->list[i].tc_position == log_position)
00445 goto done;
00446 }
00447 }
00448
00449
00450 if (!tc_Recovering) {
00451 switch (TRANS_TYPE(rec->tr_type)) {
00452 case MS_RollBackTxn:
00453 case MS_Committed:
00454 case MS_RecoveredTxn:
00455
00456 break;
00457
00458 case MS_PartialRollBackTxn:
00459 {
00460
00461 for (uint32_t i = rec->tr_db_id;i < lrec->len; i++)
00462 lrec->list[i].tc_rolled_back = true;
00463
00464 break;
00465 }
00466
00467 case MS_ReferenceTxn:
00468 case MS_DereferenceTxn:
00469 {
00470 myTransPtr my_rec;
00471
00472 if (lrec->len == lrec->size) {
00473 cs_realloc((void **) &(lrec->list), (lrec->size + 10)* sizeof(myTransRec));
00474 lrec->size += 10;
00475 }
00476
00477 my_rec = lrec->list + lrec->len;
00478 my_rec->tc_type = rec->tr_type;
00479 my_rec->tc_db_id = rec->tr_db_id;
00480 my_rec->tc_tab_id = rec->tr_tab_id;
00481 my_rec->tc_blob_id = rec->tr_blob_id;
00482 my_rec->tc_blob_ref_id = rec->tr_blob_ref_id;
00483 my_rec->tc_position = log_position;
00484 my_rec->tc_rolled_back = false;
00485
00486 lrec->len++;
00487 break;
00488 }
00489
00490 }
00491 } else if ( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn))
00492 lrec->len++;
00493
00494
00495
00496
00497
00498 if (TRANS_IS_TERMINATED(rec->tr_type)) {
00499 if (rec->tr_type == MS_RollBackTxn)
00500 lrec->terminated = MS_RolledBack;
00501 else if (rec->tr_type == MS_RecoveredTxn)
00502 lrec->terminated = MS_Recovered;
00503 else
00504 lrec->terminated = MS_Committed;
00505 }
00506
00507 done:
00508 unlock_(this);
00509 exit_();
00510 }
00511
00512
00513
00514
00515 bool MSTransCache::tc_GetTransaction(TRef *ref, bool *terminated)
00516 {
00517 if (!tc_Used)
00518 return false;
00519
00520 ASSERT(tc_List[tc_First].tid);
00521
00522 *ref = tc_First;
00523 *terminated = (tc_List[tc_First].terminated != MS_Running);
00524
00525 return true;
00526 }
00527
00528
00529 bool MSTransCache::tc_GetTransactionStartPosition(uint64_t *log_position)
00530 {
00531 if ((!tc_Used) || (tc_List[tc_First].len == 0))
00532 return false;
00533
00534 *log_position = tc_List[tc_First].log_position;
00535 return true;
00536 }
00537
00538
00539 MS_TxnState MSTransCache::tc_TransactionState(TRef ref)
00540 {
00541 ASSERT((ref < tc_Size) && tc_List[ref].tid);
00542
00543 return tc_List[ref].terminated;
00544 }
00545
00546 uint32_t MSTransCache::tc_GetTransactionID(TRef ref)
00547 {
00548 ASSERT((ref < tc_Size) && tc_List[ref].tid);
00549
00550 return (tc_List[ref].tid);
00551 }
00552
00553
00554 void MSTransCache::tc_FreeTransaction(TRef tref)
00555 {
00556 TransListPtr lrec;
00557 enter_();
00558 ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
00559
00560 #ifdef CHECK_TIDS
00561 {
00562 static uint32_t last_tid = 0;
00563 static bool last_state = false;
00564 if (tc_Recovering != last_state)
00565 last_tid = 0;
00566
00567 last_state = tc_Recovering;
00568 ASSERT( ((last_tid + 1) == tc_List[tref].tid) || !last_tid);
00569 last_tid = tc_List[tref].tid;
00570 }
00571 #endif
00572
00573 lrec = tc_List + tref;
00574 #ifdef DEBUG
00575 lrec->old_tid = lrec->tid;
00576 #endif
00577 lrec->tid = 0;
00578 lrec->len = 0;
00579
00580 if (lrec->size > 10) {
00581 cs_realloc((void **) &(lrec->list), 10* sizeof(myTransRec));
00582 lrec->size = 10;
00583 }
00584
00585 lock_(this);
00586 tc_Used--;
00587
00588 if (tref == tc_First) {
00589 TRef eol = tc_EOL;
00590
00591
00592 if (tc_First > eol) {
00593 for (; tc_First < tc_Size && !tc_List[tc_First].tid; tc_First++) ;
00594
00595 if (tc_First == tc_Size)
00596 tc_First = 0;
00597 }
00598
00599 for (; tc_First < eol && !tc_List[tc_First].tid; tc_First++) ;
00600 }
00601
00602 ASSERT( (tc_Used == 0 && tc_First == tc_EOL) || (tc_Used != 0 && tc_First != tc_EOL));
00603
00604 unlock_(this);
00605
00606 exit_();
00607 }
00608
00609
00610 bool MSTransCache::tc_GetRecAt(TRef tref, size_t index, MSTransPtr rec, MS_TxnState *state)
00611 {
00612 TransListPtr lrec;
00613 bool found = false;
00614
00615 ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
00616 #ifdef CHECK_TIDS
00617 {
00618 static uint32_t last_tid = 0;
00619 ASSERT( ((last_tid + 1) == tc_List[tref].tid) || (last_tid == tc_List[tref].tid) || !last_tid);
00620 last_tid = tc_List[tref].tid;
00621 }
00622 #endif
00623
00624 lrec = tc_List + tref;
00625 if (index < lrec->len) {
00626 myTransPtr my_rec = lrec->list + index;
00627
00628 rec->tr_type = my_rec->tc_type;
00629 rec->tr_db_id = my_rec->tc_db_id;
00630 rec->tr_tab_id = my_rec->tc_tab_id;
00631 rec->tr_blob_id = my_rec->tc_blob_id;
00632 rec->tr_blob_ref_id = my_rec->tc_blob_ref_id;
00633 rec->tr_id = lrec->tid;
00634 rec->tr_check = 0;
00635 if (my_rec->tc_rolled_back)
00636 *state = MS_RolledBack;
00637 else
00638 *state = lrec->terminated;
00639
00640 found = true;
00641 }
00642
00643 return found;
00644 }
00645
00646
00647 void MSTransCache::tc_dropDatabase(uint32_t db_id)
00648 {
00649 enter_();
00650 lock_(this);
00651 if (tc_List) {
00652 for (uint32_t i = 0; i < tc_Size; i++) {
00653 myTransPtr rec = tc_List[i].list;
00654 if (rec) {
00655 uint32_t list_len = tc_List[i].len;
00656 while (list_len) {
00657 if (rec->tc_db_id == db_id)
00658 rec->tc_db_id = 0;
00659 list_len--;
00660 rec++;
00661 }
00662 }
00663 }
00664 }
00665
00666 unlock_(this);
00667 exit_();
00668 }