00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "cslib/CSConfig.h"
00031
00032 #include <stddef.h>
00033
00034 #include "defs_ms.h"
00035
00036 #include "cslib/CSGlobal.h"
00037 #include "cslib/CSStrUtil.h"
00038 #include "cslib/CSStorage.h"
00039
00040 #include "temp_log_ms.h"
00041 #include "open_table_ms.h"
00042 #include "trans_log_ms.h"
00043 #include "transaction_ms.h"
00044 #include "parameters_ms.h"
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 class SearchTXNLog : ReadTXNLog {
00055 public:
00056 SearchTXNLog(uint32_t db_id, MSTrans *log): ReadTXNLog(log), st_db_id(db_id) {}
00057
00058 bool st_found;
00059 bool st_terminated;
00060 bool st_commited;
00061 uint32_t st_tid;
00062 uint32_t st_db_id;
00063 uint32_t st_tab_id;
00064 uint64_t st_blob_id;
00065
00066 virtual bool rl_CanContinue() { return ((!st_found) || !st_terminated);}
00067 virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
00068 {
00069 (void) log_position;
00070
00071 if ( !st_found && (TRANS_TYPE(rec->tr_type) != MS_ReferenceTxn))
00072 return;
00073
00074 if (!st_found) {
00075 if ((rec->tr_db_id == st_db_id) && (rec->tr_tab_id == st_tab_id) && (rec->tr_blob_id == st_blob_id)) {
00076 st_found = true;
00077 st_tid = rec->tr_id;
00078 } else
00079 return;
00080 }
00081 st_terminated = TRANS_IS_TERMINATED(rec->tr_type);
00082 if (st_terminated)
00083 st_commited = (TRANS_IS_AUTOCOMMIT(rec->tr_type) || (TRANS_TYPE(rec->tr_type) == MS_CommitTxn));
00084 }
00085
00086 bool st_FindBlobRef(bool *committed, uint32_t tab_id, uint64_t blob_id)
00087 {
00088 enter_();
00089 st_found = st_terminated = st_commited = false;
00090 st_tab_id = tab_id;
00091 st_blob_id = blob_id;
00092
00093 rl_ReadLog(rl_log->txn_GetStartPosition(), false);
00094 *committed = st_commited;
00095 return_(st_found);
00096 }
00097 };
00098
00099 MSTempLogFile::MSTempLogFile():
00100 CSReadBufferedFile(),
00101 myTempLogID(0),
00102 myTempLog(NULL)
00103 {
00104 }
00105
00106 MSTempLogFile::~MSTempLogFile()
00107 {
00108 close();
00109 if (myTempLog)
00110 myTempLog->release();
00111 }
00112
00113 MSTempLogFile *MSTempLogFile::newTempLogFile(uint32_t id, MSTempLog *temp_log, CSFile *file)
00114 {
00115 MSTempLogFile *f;
00116 enter_();
00117
00118 push_(temp_log);
00119 push_(file);
00120
00121 if (!(f = new MSTempLogFile()))
00122 CSException::throwOSError(CS_CONTEXT, ENOMEM);
00123
00124 f->myTempLogID = id;
00125
00126 pop_(file);
00127 f->setFile(file);
00128
00129 pop_(temp_log);
00130 f->myTempLog = temp_log;
00131 return_(f);
00132 }
00133
00134 MSTempLog::MSTempLog(uint32_t id, MSDatabase *db, off64_t file_size):
00135 CSRefObject(),
00136 myLogID(id),
00137 myTempLogSize(file_size),
00138 myTemplogRecSize(0),
00139 myTempLogHeadSize(0),
00140 iLogDatabase(db),
00141 iDeleteLog(false)
00142 {
00143 }
00144
00145 MSTempLog::~MSTempLog()
00146 {
00147 enter_();
00148 if (iDeleteLog) {
00149 CSPath *path;
00150
00151 path = getLogPath();
00152 push_(path);
00153 path->removeFile();
00154 release_(path);
00155 }
00156 exit_();
00157 }
00158
00159 void MSTempLog::deleteLog()
00160 {
00161 iDeleteLog = true;
00162 }
00163
00164 CSPath *MSTempLog::getLogPath()
00165 {
00166 char file_name[120];
00167
00168 cs_strcpy(120, file_name, "bs-logs");
00169 cs_add_dir_char(120, file_name);
00170 cs_strcat(120, file_name, "temp-");
00171 cs_strcat(120, file_name, myLogID);
00172 cs_strcat(120, file_name, ".bs");
00173 return CSPath::newPath(RETAIN(iLogDatabase->myDatabasePath), file_name);
00174 }
00175
00176 MSTempLogFile *MSTempLog::openTempLog()
00177 {
00178 CSPath *path;
00179 MSTempLogFile *fh;
00180
00181 enter_();
00182 path = getLogPath();
00183 retain();
00184 fh = MSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
00185 push_(fh);
00186 if (myTempLogSize)
00187 fh->open(CSFile::DEFAULT);
00188 else
00189 fh->open(CSFile::CREATE);
00190 if (!myTempLogHeadSize) {
00191 MSTempLogHeadRec head;
00192
00193 lock_(iLogDatabase->myTempLogArray);
00194
00195 if (!myTempLogHeadSize) {
00196 size_t rem;
00197
00198 if (fh->read(&head, 0, offsetof(MSTempLogHeadRec, th_reserved_4), 0) < offsetof(MSTempLogHeadRec, th_reserved_4)) {
00199 CS_SET_DISK_4(head.th_magic_4, MS_TEMP_LOG_MAGIC);
00200 CS_SET_DISK_2(head.th_version_2, MS_TEMP_LOG_VERSION);
00201 CS_SET_DISK_2(head.th_head_size_2, MS_TEMP_LOG_HEAD_SIZE);
00202 CS_SET_DISK_2(head.th_rec_size_2, sizeof(MSTempLogItemRec));
00203 CS_SET_DISK_4(head.th_reserved_4, 0);
00204 fh->write(&head, 0, sizeof(MSTempLogHeadRec));
00205 fh->flush();
00206 }
00207
00208
00209 if (CS_GET_DISK_4(head.th_magic_4) != MS_TEMP_LOG_MAGIC)
00210 CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
00211 if (CS_GET_DISK_2(head.th_version_2) > MS_TEMP_LOG_VERSION)
00212 CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
00213
00214
00215 myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
00216 myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
00217
00218
00219 if (myTempLogSize < myTempLogHeadSize)
00220 myTempLogSize = myTempLogHeadSize;
00221 if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
00222 myTempLogSize += myTemplogRecSize - rem;
00223 }
00224 unlock_(iLogDatabase->myTempLogArray);
00225 }
00226 pop_(fh);
00227 return_(fh);
00228 }
00229
00230 time_t MSTempLog::adjustWaitTime(time_t then, time_t now)
00231 {
00232 time_t wait;
00233
00234 if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00235 wait = ((then + PBMSParameters::getTempBlobTimeout() - now) * 1000);
00236 if (wait < 2000)
00237 wait = 2000;
00238 else if (wait > 120 * 1000)
00239 wait = 120 * 1000;
00240 }
00241 else
00242 wait = 1;
00243
00244 return wait;
00245 }
00246
00247
00248
00249
00250
00251
00252 MSTempLogThread::MSTempLogThread(time_t wait_time, MSDatabase *db):
00253 CSDaemon(wait_time, NULL),
00254 iTempLogDatabase(db),
00255 iTempLogFile(NULL),
00256 iLogRecSize(0),
00257 iLogOffset(0)
00258 {
00259 }
00260
00261
00262 void MSTempLogThread::close()
00263 {
00264 if (iTempLogFile) {
00265 iTempLogFile->release();
00266 iTempLogFile = NULL;
00267 }
00268 }
00269
00270 bool MSTempLogThread::try_ReleaseBLOBReference(CSThread *self, CSStringBuffer *buffer, uint32_t tab_id, int type, uint64_t blob_id, uint32_t auth_code)
00271 {
00272 volatile bool rtc = true;
00273 try_(a) {
00274
00275 MSOpenTable *otab;
00276
00277 if (type == MS_TL_REPO_REF) {
00278 MSRepoFile *repo_file;
00279
00280 if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
00281 frompool_(repo_file);
00282 repo_file->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
00283 backtopool_(repo_file);
00284 }
00285 }
00286 else {
00287 if ((otab = MSTableList::getOpenTableByID(iTempLogDatabase->myDatabaseID, tab_id))) {
00288 frompool_(otab);
00289 if (type == MS_TL_BLOB_REF) {
00290 otab->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
00291 backtopool_(otab);
00292 }
00293 else {
00294 ASSERT(type == MS_TL_TABLE_REF);
00295 if ((type == MS_TL_TABLE_REF) && otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
00296
00297 MSTable *tab;
00298 CSPath *from_path;
00299 MSOpenTablePool *tab_pool;
00300
00301 tab = otab->getDBTable();
00302 from_path = otab->getDBTable()->getTableFile();
00303
00304 pop_(otab);
00305
00306 push_(from_path);
00307 tab->retain();
00308 push_(tab);
00309
00310 tab_pool = MSTableList::lockTablePoolForDeletion(otab);
00311 frompool_(tab_pool);
00312
00313 from_path->removeFile();
00314 tab->myDatabase->removeTable(tab);
00315
00316 backtopool_(tab_pool);
00317 pop_(tab);
00318 release_(from_path);
00319 }
00320 else
00321 backtopool_(otab);
00322 }
00323 }
00324 }
00325
00326 rtc = false;
00327 }
00328
00329 catch_(a);
00330 cont_(a);
00331 return rtc;
00332 }
00333
00334 bool MSTempLogThread::doWork()
00335 {
00336 size_t tfer;
00337 MSTempLogItemRec log_item;
00338 CSStringBuffer *buffer;
00339 SearchTXNLog txn_log(iTempLogDatabase->myDatabaseID, MSTransactionManager::tm_Log);
00340
00341 enter_();
00342 new_(buffer, CSStringBuffer(20));
00343 push_(buffer);
00344 while (!myMustQuit) {
00345 if (!iTempLogFile) {
00346 size_t head_size;
00347 if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
00348 release_(buffer);
00349 return_(true);
00350 }
00351 iLogOffset = head_size;
00352 }
00353
00354 tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(MSTempLogItemRec), 0);
00355 if (tfer == 0) {
00356
00357
00358
00359 if (iTempLogDatabase->getTempLogCount() <= 1) {
00360
00361
00362
00363
00364 myWaitTime = PBMSParameters::getTempBlobTimeout() * 1000;
00365 break;
00366 }
00367
00368 iTempLogFile->myTempLog->deleteLog();
00369 iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
00370 close();
00371 }
00372 else if (tfer == sizeof(MSTempLogItemRec)) {
00373
00374 int type;
00375 uint32_t tab_id;
00376 uint64_t blob_id= 0;
00377 uint32_t auth_code;
00378 uint32_t then;
00379 time_t now;
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389 tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
00390
00391 type = CS_GET_DISK_1(log_item.ti_type_1);
00392 blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
00393 auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
00394 then = CS_GET_DISK_4(log_item.ti_time_4);
00395
00396 now = time(NULL);
00397 if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00398
00399 myWaitTime = MSTempLog::adjustWaitTime(then, now);
00400 break;
00401 }
00402
00403 if (try_ReleaseBLOBReference(self, buffer, tab_id, type, blob_id, auth_code)) {
00404 int err = self->myException.getErrorCode();
00405
00406 if (err == MS_ERR_TABLE_LOCKED) {
00407 throw_();
00408 }
00409 else if (err == MS_ERR_REMOVING_REPO) {
00410
00411 myWaitTime = 2 * 1000;
00412 release_(buffer);
00413 return_(true);
00414 }
00415 else if ((err == MS_ERR_UNKNOWN_TABLE) || (err == MS_ERR_DATABASE_DELETED))
00416 ;
00417 else
00418 self->myException.log(NULL);
00419 }
00420
00421 }
00422 else {
00423
00424 myWaitTime = 2 * 1000;
00425 break;
00426 }
00427 iLogOffset += iLogRecSize;
00428 }
00429
00430 release_(buffer);
00431 return_(true);
00432 }
00433
00434 void *MSTempLogThread::completeWork()
00435 {
00436 close();
00437 return NULL;
00438 }
00439