Drizzled Public API Documentation

trans_log_ms.cc
00001 /* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Barry Leslie
00020  *
00021  * 2009-06-09
00022  *
00023  * H&G2JCtL
00024  *
00025  * PBMS transaction handling.
00026  *
00027  * PBMS uses 1 circular transaction log. All BLOB reference operations are written to this log
00028  * and are applied to the repository when committed. There is 1 thread dedicated to reading the 
00029  * transaction log and applying the changes. During an engine level backup this thread is suspended 
00030  * so that no transactions will be applied to the repository files as they are backed up.
00031  *
00032  */
00033 #include "cslib/CSConfig.h"
00034 
00035 #include <stdlib.h>
00036 #include <inttypes.h>
00037 
00038 #include "cslib/CSGlobal.h"
00039 #include "cslib/CSStrUtil.h"
00040 #include "cslib/CSStorage.h"
00041 
00042 #include "trans_log_ms.h"
00043 #include "trans_cache_ms.h"
00044 
00045 #ifdef CRASH_TEST
00046 uint32_t  trans_test_crash_point;
00047 #define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
00048 #else
00049 #define CRASH_POINT(p)
00050 #endif
00051 
00052 #define MS_TRANS_LOG_MAGIC      0xA6E7D7B3
00053 #define MS_TRANS_LOG_VERSION    1
00054 #define MS_TRANS_LOG_RECOVERED    0XA1
00055 #define MS_TRANS_LOG_NOT_RECOVERED  0XA2
00056 #define MS_TRANS_NO_OVERFLOW    0XB1
00057 #define MS_TRANS_OVERFLOW     0XB2
00058 
00059 #define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
00060 
00061 #define DFLT_TRANS_LOG_LIST_SIZE  (1024 * 10)
00062 #define DFLT_TRANS_CACHE_SIZE   (500)
00063 
00064 #define TRANS_CAN_RESIZE  ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
00065 
00066 typedef struct MSDiskTrans {
00067   CSDiskValue4  dtr_id_4;     // The transaction ID
00068   CSDiskValue1  dtr_type_1;     // The transaction type. If the first bit is set then the transaction is an autocommit.
00069   CSDiskValue1  dtr_check_1;    // The trransaction record checksum.
00070   CSDiskValue4  dtr_db_id_4;    // The database ID for the operation.
00071   CSDiskValue4  dtr_tab_id_4;   // The table ID for the operation.
00072   CSDiskValue8  dtr_blob_id_8;    // The blob ID for the operation.
00073   CSDiskValue8  dtr_blob_ref_id_8;  // The blob reference id.
00074 } MSDiskTransRec, *MSDiskTransPtr;
00075 
00076 #define SET_DISK_TRANSREC(d, s) { \
00077   CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
00078   CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
00079   CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
00080   CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
00081   CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
00082   CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
00083   CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
00084 }
00085 
00086 #define GET_DISK_TRANSREC(s, d) { \
00087   (s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
00088   (s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
00089   (s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
00090   (s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
00091   (s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
00092   (s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
00093   (s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
00094 }
00095 
00096 static uint8_t checksum(uint8_t *data, size_t len)
00097 {
00098   register uint32_t sum = 0, g;
00099   uint8_t       *chk;
00100 
00101   chk = data + len - 1;
00102   while (chk > data) {
00103     sum = (sum << 4) + *chk;
00104     if ((g = sum & 0xF0000000)) {
00105       sum = sum ^ (g >> 24);
00106       sum = sum ^ g;
00107     }
00108     chk--;
00109   }
00110   return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
00111 }
00112 
00113 MSTrans::MSTrans() : 
00114   CSSharedRefObject(),
00115   txn_MaxCheckPoint(0),
00116   txn_Doingbackup(false),
00117   txn_reader(NULL),
00118   txn_IsTxnValid(false),
00119   txn_CurrentTxn(0),
00120   txn_TxnIndex(0),
00121   txn_StartCheckPoint(0),
00122   txn_TransCache(NULL),
00123   txn_BlockingTransaction(0),
00124   txn_File(NULL),
00125   txn_EOLCheckPoint(0),
00126   txn_MaxRecords(0),
00127   txn_ReqestedMaxRecords(0),
00128   txn_HighWaterMark(0),
00129   txn_OverflowCount(0),
00130   txn_MaxTID(0),
00131   txn_Recovered(false),
00132   txn_HaveOverflow(false),
00133   txn_Overflow(0),
00134   txn_EOL(0),
00135   txn_Start(0),
00136   txn_Checksum(0)
00137 {
00138 }
00139 
00140 MSTrans::~MSTrans()
00141 {
00142   txn_Close();
00143   if (txn_TransCache) 
00144     txn_TransCache->release();
00145     
00146 } 
00147 
00148 void MSTrans::txn_Close()
00149 {
00150   
00151   if (txn_File) {   
00152     // Set the header to indicate that the log has not been closed properly. 
00153     CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
00154     txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
00155 
00156     CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00157     CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00158     CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
00159     txn_File->write(&(txn_DiskHeader.th_start_8), 
00160               offsetof(MSDiskTransHeadRec, th_start_8), 
00161               sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
00162 CRASH_POINT(1);
00163     txn_File->flush();
00164     txn_File->sync();
00165 
00166     if (txn_Recovered) {
00167       // Write the recovered flag seperately just incase of a crash during the write operation.
00168       CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);   
00169       txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
00170       txn_File->flush();
00171       txn_File->sync();
00172     }
00173     
00174     txn_File->close();
00175     txn_File->release();
00176     txn_File = NULL;    
00177   }
00178 }
00179 void MSTrans::txn_SetFile(CSFile *tr_file)
00180 {
00181   txn_File = tr_file;
00182 }
00183 
00184 //#define TRACE_ALL
00185 #ifdef TRACE_ALL
00186 static FILE *txn_debug_log;
00187 #endif
00188 
00189 MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
00190 {
00191   MSTrans *trans = NULL;
00192   CSPath *path = NULL;
00193   uint64_t log_size;
00194   enter_();
00195 
00196   (void) dump_log;
00197   
00198   new_(trans, MSTrans());
00199   push_(trans);
00200 
00201   path = CSPath::newPath(log_path);
00202   push_(path);  
00203     
00204 try_again:
00205   
00206   
00207   if (!path->exists()) { // Create the transaction log. 
00208     // Preallocate the log space and initialize it.
00209     MSDiskTransRec *recs;
00210     off64_t offset = sizeof(MSDiskTransHeadRec);
00211     uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
00212     size_t size;
00213     CSFile *tr_file;
00214     
00215     recs = (MSDiskTransRec *) cs_calloc(1024 * sizeof(MSDiskTransRec));
00216     push_ptr_(recs);
00217     
00218     tr_file = path->createFile(CSFile::CREATE);
00219     push_(tr_file);
00220     
00221     log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
00222 
00223     
00224     while (num_records) {
00225       if (num_records < 1024)
00226         size = num_records;
00227       else
00228         size = 1024;
00229       tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
00230       offset += size * sizeof(MSDiskTransRec);
00231       num_records -= size;
00232     }
00233 
00234     trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
00235     trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
00236     trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
00237     trans->txn_MaxTID = 1;
00238 
00239     // Initialize the log header.
00240     CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
00241     CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
00242     
00243     CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
00244 
00245     CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
00246 
00247     CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
00248     CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
00249 
00250     CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
00251     
00252     CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
00253     CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
00254     
00255     CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
00256     CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
00257     CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
00258     
00259     tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
00260     pop_(tr_file);
00261     trans->txn_SetFile(tr_file);
00262     
00263     trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
00264     
00265     trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
00266     
00267     release_(recs);
00268 
00269   } else { // The transaction log already exists
00270     bool overflow = false, recovered = false;
00271     
00272     CSFile *tr_file = path->createFile(CSFile::DEFAULT); // Open read/write
00273     push_(tr_file);
00274     
00275     // Read the log header:
00276     if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
00277       release_(tr_file);
00278       path->removeFile();
00279       goto try_again;
00280     }
00281     
00282     // check the log header:    
00283     if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
00284       CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
00285     
00286     if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
00287       CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
00288       
00289     //----
00290     if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW) 
00291       overflow = false;
00292     else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW) 
00293       overflow = true;
00294     else 
00295       CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
00296     
00297     //----
00298     if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED) 
00299       recovered = false;
00300     else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED) 
00301       recovered = true;
00302     else 
00303       CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
00304 
00305     // Check that the log is the expected size.
00306     log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
00307     
00308     if ((log_size > tr_file->getEOF()) || 
00309       ((log_size < tr_file->getEOF()) && !overflow)){ 
00310           
00311       char buffer[CS_EXC_MESSAGE_SIZE];   
00312       cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
00313       cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
00314       CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
00315     }
00316     
00317     trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
00318 
00319     // Looks good, we will assume it is a valid log file.
00320     trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
00321 
00322     pop_(tr_file);
00323     trans->txn_SetFile(tr_file);
00324 
00325     trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
00326     
00327     trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
00328     trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
00329     
00330     trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
00331     trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
00332     trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
00333     trans->txn_HaveOverflow = overflow;
00334     if (overflow) 
00335       trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec); 
00336     else
00337       trans->txn_Overflow = 0;
00338 
00339 #ifdef DEBUG
00340     if (overflow)
00341       printf("Recovering overflow log\n");
00342     if (dump_log) {
00343       char name[100];
00344       snprintf(name, 100, "%dms-trans-log.dump", (int)time(NULL));
00345       trans->txn_DumpLog(name);
00346     }
00347 #endif
00348     // Recover the log if required.
00349     if (!recovered)
00350       trans->txn_Recover();
00351       
00352       
00353   }
00354   
00355   trans->txn_Recovered = true; // Any recovery required has been completed.
00356 
00357   // The log has been recovered so these values should be valid:
00358   trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
00359   trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
00360   
00361   // Set the header to indicate that the log has not been closed properly. 
00362   // This is reset when the log is closed during shutdown.
00363   CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
00364   trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
00365   
00366   // Load the transaction records into memory.
00367   trans->txn_TransCache->tc_StartCacheReload(true);
00368   trans->txn_LoadTransactionCache(trans->txn_Start);
00369   trans->txn_TransCache->tc_CompleteCacheReload();
00370   
00371   if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords) 
00372     trans->txn_ResizeLog(); // Try to resize but it may not be possible yet.
00373   
00374   release_(path);
00375   pop_(trans);
00376 
00377 #ifdef TRACE_ALL
00378   
00379   txn_debug_log = fopen("log_dump.txt", "w+");
00380   if (!txn_debug_log) {
00381     perror("log_dump.txt");
00382   }
00383 #endif
00384   
00385   return_(trans);
00386 }
00387 
00388 bool MSTrans::txn_ValidRecord(MSTransPtr rec)
00389 {
00390   uint8_t check = rec->tr_check;
00391   bool ok;
00392   
00393   rec->tr_check = txn_Checksum;
00394   ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
00395   rec->tr_check = check;
00396   return ok;
00397 }
00398 
00399 void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
00400 {
00401   MSDiskTransRec drec;
00402   off64_t offset;
00403   
00404   // Read 1 record from the log and convert it from disk format.
00405   offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);   
00406   txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
00407   GET_DISK_TRANSREC(rec, &drec);
00408 }
00409 
00410 // Recovery involves finding the start of the first record and the eof
00411 // position. The positions will be found at or after the position stored
00412 // in the header. 
00413 void MSTrans::txn_Recover()
00414 {
00415   MSTransRec rec = {0,0,0,0,0,0,0};
00416   uint64_t original_eol = txn_EOL;
00417   enter_();
00418 
00419 #ifdef DEBUG
00420 printf("Recovering  transaction log!\n");
00421 #endif
00422 
00423   txn_MaxTID = 0;
00424   // Search for the last valid record in the log starting from the last 
00425   // known position stored in the header.
00426   for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
00427     txn_GetRecordAt(txn_EOL, &rec);
00428     if (! txn_ValidRecord(&rec))
00429       break;  
00430   }
00431   
00432   if (txn_EOL == txn_MaxRecords) {
00433     // It looks like all the records in the log are valid?
00434     // This is strange but could happen if the crash
00435     // occurred just before updating the header as the
00436     // eol position rolled over to the top of the log.
00437     txn_EOL = 0;
00438   }
00439   
00440   txn_MaxTID++;
00441   
00442   CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00443   
00444   // If the actual eol has moved pass the recorded start position
00445   // then the actuall start position must be some where beyond
00446   // the eol.
00447   if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start)) 
00448     txn_Start = txn_EOL +1; 
00449 
00450   // Position the start at the beginning of a transaction.  
00451   uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
00452   for (; txn_Start < end_search; txn_Start++) {
00453     txn_GetRecordAt(txn_Start, &rec);
00454     if (TRANS_IS_START(rec.tr_type))
00455       break;  
00456   }
00457 
00458   if (txn_Start == end_search)
00459     txn_Start = txn_EOL; 
00460     
00461   CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00462   
00463   txn_TransCache->tc_SetRecovering(true); 
00464   // Load the transaction records into the cache.
00465   txn_TransCache->tc_StartCacheReload(true);
00466   txn_LoadTransactionCache(txn_Start);
00467   txn_TransCache->tc_CompleteCacheReload();
00468   
00469   // Now go through all the transactions and add rollbacks for any
00470   // unterminated transactions.
00471   TRef ref;
00472   bool terminated;
00473   while  (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
00474     
00475     txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref); // Save the TID of the last transaction.
00476     if (!terminated) {
00477       self->myTID = txn_MaxTID;
00478       self->myTransRef = ref;
00479       self->myStartTxn = false;
00480       txn_AddTransaction(MS_RecoveredTxn);
00481     }
00482 CRASH_POINT(2);
00483     txn_TransCache->tc_FreeTransaction(ref);
00484     
00485     // Load the next block of transactions into the cache.
00486     // This needs to be done after each tc_GetTransaction() to make sure
00487     // that if the transaction terminator is some where in the log
00488     // it will get read even if the cache is completely full. 
00489     if (txn_TransCache->tc_ShoulReloadCache()) {
00490       txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
00491       txn_TransCache->tc_CompleteCacheReload();
00492     }
00493   }
00494   
00495   
00496   txn_TransCache->tc_SetRecovering(false); 
00497   self->myTransRef = 0;
00498   
00499   // Update the header again incase rollbacks have been added.
00500   CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00501         
00502   exit_();
00503 }
00504 
00505 bool ReadTXNLog::rl_CanContinue() 
00506 { 
00507   return rl_log->txn_TransCache->tc_ContinueCacheReload();
00508 }
00509 
00510 void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec) 
00511 {
00512   rl_log->txn_TransCache->tc_AddRec(log_position, rec);
00513 }
00514 
00515 void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec) 
00516 {
00517   MSDiskTransRec drec;
00518   SET_DISK_TRANSREC(&drec, rec);
00519 
00520   rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
00521 }
00522 
00523 void ReadTXNLog::rl_Flush() 
00524 {
00525   rl_log->txn_File->flush();
00526   rl_log->txn_File->sync();
00527 }
00528 
00529 void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
00530 {
00531   uint64_t  size, orig_size;
00532   bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
00533   enter_();
00534   
00535 
00536   // Get the number of transaction records to be loaded.
00537   if (reading_overflow) {
00538     orig_size = rl_log->txn_Overflow;
00539     size = rl_log->txn_Overflow - read_start;
00540   } else {
00541     orig_size = rl_log->txn_GetNumRecords();
00542   
00543     if (rl_log->txn_Start <= read_start)
00544       size = orig_size - (read_start - rl_log->txn_Start);
00545     else 
00546       size = rl_log->txn_EOL - read_start;
00547   }
00548   
00549   // load all the records
00550   while (size && rl_CanContinue()) {
00551     MSDiskTransRec diskRecords[1000];
00552     uint32_t read_size;
00553     off64_t offset;
00554     
00555     if (size > 1000) 
00556       read_size = 1000 ;
00557     else
00558       read_size = size ;
00559     
00560     // Check if we have reached the wrap around point in the log.
00561     if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
00562       read_size = rl_log->txn_MaxRecords - read_start ;
00563 
00564     // Read the next block of records.
00565     offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);    
00566     rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
00567     
00568     // Convert the records from disk format and add them to the cache.
00569     for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
00570       MSTransRec rec;
00571       MSDiskTransPtr drec = diskRecords + i;
00572       GET_DISK_TRANSREC(&rec, drec);
00573       
00574       rl_Load(read_start + i, &rec); 
00575     }
00576     
00577     size -= read_size;
00578     read_start += read_size;
00579     if (read_start == rl_log->txn_MaxRecords)
00580       read_start = 0;
00581   }
00582   
00583   if (rl_log->txn_HaveOverflow && !reading_overflow) {
00584     if (rl_CanContinue()) 
00585       rl_ReadLog(rl_log->txn_MaxRecords, false);
00586     
00587   } else if (!log_locked) { 
00588     // The following is intended to prevent the case where a writer 
00589     // writes an txn record while the cache is full but just after 
00590     // the reload has completed. If the cache is not yet full we need
00591     // to load as many of the new records into cache as possible.
00592   
00593     uint64_t  new_size;
00594     lock_(rl_log);
00595     if (reading_overflow)
00596       new_size = rl_log->txn_Overflow;
00597     else 
00598       new_size = rl_log->txn_GetNumRecords();
00599     if (rl_CanContinue() && (orig_size != new_size)) {
00600       rl_ReadLog(read_start, true);
00601     }
00602     unlock_(rl_log);
00603   }
00604   
00605 
00606   exit_();
00607 }
00608 
00609 void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
00610 {
00611   ReadTXNLog log(this);
00612   enter_();
00613   log.rl_ReadLog(read_start, false);
00614   txn_TransCache->tc_UpdateCacheVersion(); // Signal writes to recheck cache for overflow txn refs.
00615   exit_();
00616 }
00617 
00618 void  MSTrans::txn_ResizeLog()
00619 {
00620   enter_();
00621   
00622   lock_(this);
00623   if (TRANS_CAN_RESIZE) {
00624     // TRANS_CAN_RESIZE checks that there is no overflow and the the start position 
00625     // is less than eol. This implies the from eol to the end of file doesn't contain
00626     // and used records.
00627     
00628 
00629 #ifdef DEBUG  
00630     uint64_t old_size = txn_MaxRecords;
00631 #endif    
00632     if (txn_MaxRecords > txn_ReqestedMaxRecords) { // Shrink the log
00633       uint64_t max_resize = txn_MaxRecords - txn_EOL;
00634       
00635       if ( txn_Start == txn_EOL)
00636         max_resize = txn_MaxRecords;
00637       else {
00638         max_resize = txn_MaxRecords - txn_EOL;
00639         if (!txn_Start) // If start is at '0' then the EOL cannot be wrapped.
00640           max_resize--;
00641       }
00642       
00643         
00644       if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
00645         max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
00646               
00647       txn_MaxRecords -=   max_resize;
00648     } else
00649       txn_MaxRecords = txn_ReqestedMaxRecords; // Grow the log
00650 
00651 #ifdef DEBUG      
00652     char buffer[CS_EXC_MESSAGE_SIZE];   
00653     snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n",  old_size, txn_MaxRecords);
00654     CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
00655 #endif
00656 
00657     CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
00658     
00659     txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
00660     txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
00661     
00662     if (txn_Start == txn_EOL) {
00663       txn_Start = 0;
00664       txn_EOL = 0;
00665     } else if (txn_MaxRecords == txn_EOL) {
00666       txn_EOL = 0;
00667     }
00668     
00669     txn_ResetEOL();
00670         
00671   } 
00672   unlock_(this);
00673   
00674   exit_();
00675 }
00676 
00677 void  MSTrans::txn_ResetEOL()
00678 {
00679   enter_();
00680   
00681   txn_EOLCheckPoint = txn_MaxCheckPoint;
00682   txn_StartCheckPoint = txn_MaxCheckPoint;
00683   
00684   if (!txn_EOL)
00685     txn_Checksum++;
00686   CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00687   CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00688   CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
00689   txn_File->write(&(txn_DiskHeader.th_start_8), 
00690             offsetof(MSDiskTransHeadRec, th_start_8), 
00691             sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
00692 CRASH_POINT(5);
00693   txn_File->flush();
00694   txn_File->sync();
00695 CRASH_POINT(10);
00696     
00697   exit_();
00698 }
00699 
00700 #define PRINT_TRANS(tid, a, t) 
00701 
00702 #ifndef PRINT_TRANS
00703 #define PRINT_TRANS(tid, a, t) printTrans(tid, a, t)
00704 static void printTrans(uint32_t tid, bool autocommit, MS_Txn type)
00705 {
00706   const char *type_name = "???";
00707   
00708   switch (type) {
00709     case MS_RollBackTxn:
00710       type_name = "Rollback";
00711       break;
00712     case MS_PartialRollBackTxn:
00713       type_name = "PartialRollBack";
00714       break;
00715     case MS_CommitTxn:
00716       type_name = "Commit";
00717       break;
00718     case MS_ReferenceTxn:
00719       type_name = "Reference";
00720       break;
00721     case MS_DereferenceTxn:
00722       type_name = "Dereference";
00723       break;
00724     case MS_RecoveredTxn:
00725       type_name = "Recovered";
00726       break;
00727   }
00728   
00729   fprintf(stderr, "MSTrans::txn_LogTransaction(%d, autocommit = %s, %s)\n", tid, (autocommit)?"On":"Off", type_name);
00730 
00731 }
00732 #endif
00733 
00734 void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
00735 {
00736   enter_();
00737   
00738   lock_(this);
00739   if (!self->myTID) {
00740     switch (type) {
00741       case MS_RollBackTxn:
00742       case MS_PartialRollBackTxn:
00743       case MS_CommitTxn: {
00744         unlock_(this);
00745         exit_();;
00746       }
00747       case MS_ReferenceTxn:
00748       case MS_DereferenceTxn:
00749       case MS_RecoveredTxn:
00750         break;
00751     }
00752     txn_MaxTID++;
00753     self->myTID = txn_MaxTID;
00754     self->myTransRef = TRANS_CACHE_NEW_REF;
00755     self->myStartTxn = true;
00756   }
00757 
00758   PRINT_TRANS(self->myTID, autocommit, type);
00759   
00760   txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
00761   if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
00762     txn_NewTransaction();
00763     
00764   unlock_(this);
00765   
00766   exit_();
00767 }
00768 
00769 void  MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
00770 {
00771   MSTransRec rec = {0,0,0,0,0,0,0}; // This must be set to zero so that the checksum will be valid. 
00772   MSDiskTransRec drec;
00773   uint64_t new_offset = txn_EOL;
00774   bool do_flush = true;
00775 
00776   enter_();
00777 
00778   
00779   // Check that the log is not already full.
00780   if (txn_IsFull()) {   
00781     if (!txn_HaveOverflow) { // The first overflow record: update the header.
00782       CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
00783       txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
00784 
00785       CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00786       CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00787       txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
00788 
00789       txn_File->flush();
00790       txn_File->sync();
00791       txn_HaveOverflow = true;
00792       txn_OverflowCount++;
00793       txn_Overflow =  txn_MaxRecords;   
00794     }
00795     
00796     new_offset = txn_Overflow;
00797   }
00798     
00799   rec.tr_id = self->myTID ;
00800   rec.tr_type = tran_type;
00801   rec.tr_db_id = db_id;
00802   rec.tr_tab_id = tab_id;
00803   rec.tr_blob_id = blob_id;
00804   rec.tr_blob_ref_id = blob_ref_id;
00805   
00806   if (self->myStartTxn) {
00807     TRANS_SET_START(rec.tr_type);
00808     self->myStartTxn = false;
00809   }
00810     
00811   if (autocommit) {
00812     TRANS_SET_AUTOCOMMIT(rec.tr_type);
00813   }
00814     
00815 #ifdef TRACE_ALL
00816 if (txn_debug_log){
00817 char *ttype, *cmt;
00818 switch (TRANS_TYPE(rec.tr_type)) {
00819   case MS_ReferenceTxn:
00820     ttype = "+";
00821     break;
00822   case MS_DereferenceTxn:
00823     ttype = "-";
00824     break;
00825   case MS_RollBackTxn:
00826     ttype = "rb";
00827     rec.tr_blob_ref_id = 0;
00828     break;
00829   case MS_RecoveredTxn:
00830     ttype = "rcov";
00831     rec.tr_blob_ref_id = 0;
00832     break;
00833   default:
00834     ttype = "???";
00835 }
00836 
00837 if (TRANS_IS_TERMINATED(rec.tr_type))
00838   cmt = "c";
00839 else
00840   cmt = "";
00841       
00842 fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64"  %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
00843 }
00844 #endif
00845 
00846   rec.tr_check = txn_Checksum; 
00847   
00848   // Calculate the records checksum.
00849   rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
00850   
00851   // Write the record to disk.
00852   SET_DISK_TRANSREC(&drec, &rec);
00853 #ifdef CRASH_TEST
00854 
00855   if (trans_test_crash_point == 9) { // do a partial write before crashing
00856     txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
00857     CRASH_POINT(9);
00858   } else
00859     txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
00860 #else 
00861   txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
00862 #endif
00863 CRASH_POINT(3);
00864   // There is no need to sync if the transaction is still running.
00865   if (TRANS_IS_TERMINATED(tran_type)) {
00866     CRASH_POINT(4); // This crash will result in a verify error because the txn was committed to the log but not the database.
00867     txn_File->flush();
00868     txn_File->sync();
00869     do_flush = false;
00870   }
00871   
00872   if (!txn_HaveOverflow) { // No need to update the header if overflowing.
00873     uint64_t rec_offset = txn_EOL;
00874     
00875     txn_EOL = new_offset;
00876     txn_EOL++;
00877     
00878     if (txn_EOL == txn_MaxRecords) {
00879       // The eol has rolled over.
00880       txn_EOL = 0;    
00881     }
00882 
00883     txn_EOLCheckPoint--;
00884     if ((!txn_EOLCheckPoint) || !txn_EOL) {
00885       
00886       // Flush the previouse write if required before updating the header.
00887       // This is just in case it crashes during the sync to make sure that the
00888       // header information is correct for the data on disk. If the crash occurred
00889       // between writing the header and the record the header on disk would be wrong.
00890       if (do_flush) {
00891         txn_File->flush();
00892         txn_File->sync();
00893       }
00894       
00895       txn_ResetEOL();
00896     }
00897     
00898     txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
00899     
00900     if (txn_GetNumRecords() > txn_HighWaterMark)
00901       txn_HighWaterMark = txn_GetNumRecords();
00902       
00903   } else { // Ovewrflow
00904     txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
00905     txn_Overflow++;
00906     if (txn_Overflow > txn_HighWaterMark)
00907       txn_HighWaterMark = txn_Overflow;
00908   }
00909   
00910   ASSERT(txn_EOL < txn_MaxRecords);
00911   ASSERT(txn_Start < txn_MaxRecords);
00912   exit_();
00913 }
00914 
00915 uint64_t MSTrans::txn_GetSize()   
00916 {
00917   return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
00918 }
00919 
00920 //---------------
00921 void MSTrans::txn_NewTransaction()
00922 { 
00923   enter_();
00924 
00925   self->myTID = 0;  // This will be assigned when the first record is written.
00926   
00927   exit_();
00928 }
00929 
00930 //---------------
00931 void MSTrans::txn_PerformIdleTasks()
00932 {
00933   enter_();
00934   
00935   if (txn_TransCache->tc_ShoulReloadCache()) {
00936     txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
00937     txn_TransCache->tc_CompleteCacheReload();
00938     exit_();
00939   }
00940   
00941   // During backup the reader is suspended. This may need to be changed
00942   // if we decide to actually do something here.
00943   txn_reader->suspendedWait(1000);
00944   exit_();
00945 }
00946 
00947 //---------------
00948 void MSTrans::txn_ResetReadPosition(uint64_t pos)
00949 { 
00950   bool rollover = (pos < txn_Start);
00951   enter_();
00952   
00953   if (pos >= txn_MaxRecords) { // Start of overflow
00954     lock_(this);
00955     
00956     // Overflow has occurred and the circular list is now empty 
00957     // so expand the list to include the overflow and 
00958     // reset txn_Start and txn_EOL
00959     txn_Start = txn_MaxRecords;
00960     txn_MaxRecords = txn_Overflow;
00961     txn_EOL = 0;
00962     txn_HaveOverflow = false;
00963     txn_Overflow = 0;
00964     
00965     CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
00966     CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
00967     txn_File->write(&(txn_DiskHeader.th_overflow_1),  offsetof(MSDiskTransHeadRec, th_overflow_1),  1);
00968     txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
00969         
00970     txn_ResetEOL();
00971     
00972     unlock_(this);
00973   } else
00974     txn_Start = pos;  
00975   
00976   ASSERT(txn_Start <= txn_MaxRecords);
00977   
00978   if (!rollover)
00979     txn_StartCheckPoint -= (pos - txn_Start);
00980   
00981   // Flush the header if the read position has rolled over or it is time. 
00982   if ( rollover || (txn_StartCheckPoint <=0)) {
00983     lock_(this);
00984     CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00985     CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00986     txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
00987 CRASH_POINT(5);
00988     txn_File->flush();
00989     txn_File->sync();
00990     txn_StartCheckPoint = txn_MaxCheckPoint;
00991     unlock_(this);
00992   }
00993   
00994 CRASH_POINT(6);
00995   
00996   if (TRANS_CAN_RESIZE) 
00997     txn_ResizeLog();
00998     
00999   exit_();
01000 }
01001 //---------------
01002 bool MSTrans::txn_haveNextTransaction() 
01003 {
01004   bool terminated = false;
01005   TRef ref;
01006   
01007   txn_TransCache->tc_GetTransaction(&ref, &terminated);
01008   
01009   return terminated;
01010 }
01011 
01012 //---------------
01013 void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
01014 {
01015   bool terminated;
01016   uint64_t log_position;
01017   enter_();
01018   
01019   ASSERT(txn_reader == self);
01020   lock_(txn_reader);
01021   
01022   do {
01023     // Get the next completed transaction.
01024     // this will suspend the current thread, which is assumed
01025     // to be the log reader, until one is available.
01026     while ((!txn_IsTxnValid) && !self->myMustQuit) {
01027 
01028       // wait until backup has completed.
01029       while (txn_Doingbackup && !self->myMustQuit)
01030         txn_PerformIdleTasks();
01031   
01032       if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
01033         txn_IsTxnValid = true;
01034         txn_TxnIndex = 0;
01035       } else
01036         txn_PerformIdleTasks();
01037     }
01038     
01039     if (self->myMustQuit)
01040       break;
01041       
01042     if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state)) 
01043       break;
01044       
01045 CRASH_POINT(7);
01046     txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
01047 CRASH_POINT(8);
01048     if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
01049       txn_ResetReadPosition(log_position);
01050     }else{
01051       if (txn_TransCache->tc_ShoulReloadCache()) {
01052         uint64_t pos = txn_TransCache->tc_StartCacheReload();
01053         txn_ResetReadPosition(pos);
01054         txn_LoadTransactionCache(pos);
01055         txn_TransCache->tc_CompleteCacheReload();
01056       } else {
01057         // Lock the object to prevent writer thread updates while I check again.
01058         // This is to ensure that txn_EOL is not changed between the call to
01059         // tc_GetTransactionStartPosition() and setting the read position.
01060         lock_(this);
01061         if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) 
01062           txn_ResetReadPosition(log_position);
01063         else
01064           txn_ResetReadPosition(txn_EOL);
01065         unlock_(this);
01066       }
01067     }
01068     
01069     txn_IsTxnValid = false;
01070       
01071   } while (1);
01072   
01073   unlock_(txn_reader);
01074   exit_();
01075 }
01076 
01077 
01078 void MSTrans::txn_GetStats(MSTransStatsPtr stats)
01079 {
01080   
01081   if (txn_HaveOverflow) {
01082     stats->ts_IsOverflowing = true;
01083     stats->ts_LogSize = txn_Overflow;
01084   } else {
01085     stats->ts_IsOverflowing = false;
01086     stats->ts_LogSize = txn_GetNumRecords();
01087   }
01088   stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
01089 
01090   stats->ts_MaxSize = txn_HighWaterMark;
01091   stats->ts_OverflowCount = txn_OverflowCount;
01092   
01093   stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
01094   stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
01095   stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
01096 }
01097 
01098 void MSTrans::txn_SetCacheSize(uint32_t new_size)
01099 {
01100   enter_();
01101   // Important lock order. Writer threads never lock the reader but the reader
01102   // may lock this object so always lock the reader first.
01103   lock_(txn_reader);
01104   lock_(this);
01105 
01106   CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
01107   
01108   txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
01109   txn_File->flush();
01110   txn_File->sync();
01111 
01112   txn_TransCache->tc_SetSize(new_size);
01113 
01114   unlock_(this);
01115   unlock_(txn_reader);
01116   exit_();
01117 }
01118 
01119 void MSTrans::txn_SetLogSize(uint64_t new_size)
01120 {
01121   enter_();
01122   
01123   // Important lock order. Writer threads never lock the reader but the reader
01124   // may lock this object so always lock the reader first.
01125   lock_(txn_reader);
01126   lock_(this);
01127   
01128   txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
01129   
01130   if (txn_ReqestedMaxRecords < 10)
01131     txn_ReqestedMaxRecords = 10;
01132   
01133   CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
01134   
01135   txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
01136   txn_File->flush();
01137   txn_File->sync();
01138   
01139   unlock_(this);
01140   unlock_(txn_reader);
01141   
01142   exit_();
01143 }
01144 
01145 // A helper class for resetting database IDs in the transaction log.
01146 class DBSearchTXNLog : ReadTXNLog {
01147   public:
01148   DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
01149   
01150   uint32_t sdb_db_id; 
01151   bool sdb_isDirty;
01152   
01153   virtual bool rl_CanContinue() { return true;}
01154   virtual void rl_Load(uint64_t log_position, MSTransPtr rec) 
01155   {
01156     if  (rec->tr_db_id == sdb_db_id) {
01157       sdb_isDirty = true;
01158       rec->tr_db_id = 0;
01159       rl_Store(log_position, rec);
01160     } 
01161   }
01162   
01163   void SetDataBaseIDToZero(uint32_t db_id)
01164   {
01165     sdb_db_id = db_id;
01166     rl_ReadLog(rl_log->txn_GetStartPosition(), false);
01167     if (sdb_isDirty)
01168       rl_Flush();
01169   }
01170 };
01171 
01172 // Dropping the database from the transaction log just involves
01173 // scanning the log and setting the database id of any transactions 
01174 // involving the dropped database to zero.
01175 void MSTrans::txn_dropDatabase(uint32_t db_id)
01176 {
01177   enter_();
01178   
01179   // Important lock order. Writer threads never lock the reader but the reader
01180   // may lock this object so always lock the reader first.
01181   lock_(txn_reader);
01182   lock_(this);
01183   
01184   // Clear any transaction records in the cache for the dropped database;
01185   txn_TransCache->tc_dropDatabase(db_id);
01186   
01187   // Scan the log setting the database ID for any record belonging to the
01188   // dropped database to zero. 
01189   DBSearchTXNLog searchLog(this);
01190   
01191   searchLog.SetDataBaseIDToZero(db_id);
01192     
01193   unlock_(this);
01194   unlock_(txn_reader);
01195   exit_();
01196 }
01197 
01198 #ifdef DEBUG  
01199 void MSTrans::txn_DumpLog(const char *file)
01200 {
01201   size_t  size, read_start = 0;
01202   FILE *fptr;
01203   enter_();
01204   
01205   fptr = fopen(file, "w+");
01206   if (!fptr) {
01207     perror(file);
01208     return;
01209   }
01210   
01211   if (txn_Overflow)
01212     size = txn_Overflow;
01213   else
01214     size = txn_MaxRecords;
01215   
01216   // Dump all the records
01217   while (size) {
01218     MSDiskTransRec diskRecords[1000];
01219     uint32_t read_size;
01220     off64_t offset;
01221     
01222     if (size > 1000) 
01223       read_size = 1000 ;
01224     else
01225       read_size = size ;
01226     
01227     // Read the next block of records.
01228     offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);    
01229     txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
01230     
01231     for (uint32_t i = 0; i < read_size; i++) {
01232       const char *ttype, *cmt;
01233       MSTransRec rec;
01234       MSDiskTransPtr drec = diskRecords + i;
01235       GET_DISK_TRANSREC(&rec, drec);
01236       
01237       switch (TRANS_TYPE(rec.tr_type)) {
01238         case MS_ReferenceTxn:
01239           ttype = "+";
01240           break;
01241         case MS_DereferenceTxn:
01242           ttype = "-";
01243           break;
01244         case MS_RollBackTxn:
01245           ttype = "rb";
01246           rec.tr_blob_ref_id = 0;
01247           break;
01248         case MS_RecoveredTxn:
01249           ttype = "rcov";
01250           rec.tr_blob_ref_id = 0;
01251           break;
01252         default:
01253           ttype = "???";
01254       }
01255       
01256       if (TRANS_IS_TERMINATED(rec.tr_type))
01257         cmt = "c";
01258       else
01259         cmt = "";
01260       
01261       
01262       fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, 
01263         ((read_start + i) == txn_Start) ? "START":"",
01264         ((read_start + i) == txn_EOL) ? "EOL":"",
01265         ((read_start + i) == txn_MaxRecords) ? "OverFlow":""
01266         );
01267     }
01268     
01269     size -= read_size;
01270     read_start += read_size;
01271   }
01272   fclose(fptr);
01273   exit_();
01274 } 
01275 
01276 #endif
01277