00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifdef DRIZZLED
00036 #include <config.h>
00037 #include <drizzled/common.h>
00038 #include <drizzled/session.h>
00039 #include <drizzled/table.h>
00040 #include <drizzled/message/table.pb.h>
00041 #include <drizzled/charset_info.h>
00042 #include <drizzled/table_proto.h>
00043 #include <drizzled/field.h>
00044 #include <drizzled/field/varstring.h>
00045 #endif
00046
00047 #include "cslib/CSConfig.h"
00048
00049 #include <sys/types.h>
00050 #include <inttypes.h>
00051
00052 #include "cslib/CSGlobal.h"
00053 #include "cslib/CSStrUtil.h"
00054 #include "cslib/CSStorage.h"
00055
00056 #include "defs_ms.h"
00057 #include "system_table_ms.h"
00058 #include "open_table_ms.h"
00059 #include "table_ms.h"
00060 #include "database_ms.h"
00061 #include "repository_ms.h"
00062 #include "backup_ms.h"
00063 #include "transaction_ms.h"
00064 #include "systab_variable_ms.h"
00065 #include "systab_backup_ms.h"
00066
00067 uint32_t MSBackupInfo::gMaxInfoRef;
00068 CSSyncSparseArray *MSBackupInfo::gBackupInfo;
00069
00070
00071 MSBackupInfo::MSBackupInfo( uint32_t id,
00072 const char *name,
00073 uint32_t db_id_arg,
00074 time_t start,
00075 time_t end,
00076 bool _isDump,
00077 const char *location,
00078 uint32_t cloudRef_arg,
00079 uint32_t cloudBackupNo_arg ):
00080 backupRefId(id),
00081 db_name(NULL),
00082 db_id(db_id_arg),
00083 startTime(start),
00084 completionTime(end),
00085 dump(_isDump),
00086 isRunning(false),
00087 backupLocation(NULL),
00088 cloudRef(cloudRef_arg),
00089 cloudBackupNo(cloudBackupNo_arg)
00090 {
00091 db_name = CSString::newString(name);
00092 if (location && *location)
00093 backupLocation = CSString::newString(location);
00094 }
00095
00096
00097 MSBackupInfo::~MSBackupInfo()
00098 {
00099 if (db_name)
00100 db_name->release();
00101
00102 if (backupLocation)
00103 backupLocation->release();
00104 }
00105
00106
00107 void MSBackupInfo::startBackup(MSDatabase *pbms_db)
00108 {
00109 MSDatabase *src_db;
00110
00111 enter_();
00112 push_(pbms_db);
00113
00114 src_db = MSDatabase::getDatabase(db_id);
00115 push_(src_db);
00116
00117 startTime = time(NULL);
00118
00119 src_db->startBackup(RETAIN(this));
00120 release_(src_db);
00121
00122 isRunning = true;
00123
00124 pop_(pbms_db);
00125 MSBackupTable::saveTable(pbms_db);
00126 exit_();
00127 }
00128
00129
00130 class StartDumpCleanUp : public CSRefObject {
00131 bool do_cleanup;
00132 uint32_t ref_id;
00133
00134 public:
00135
00136 StartDumpCleanUp(): CSRefObject(),
00137 do_cleanup(false){}
00138
00139 ~StartDumpCleanUp()
00140 {
00141 if (do_cleanup) {
00142 MSBackupInfo::gBackupInfo->remove(ref_id);
00143 }
00144 }
00145
00146 void setCleanUp(uint32_t id)
00147 {
00148 ref_id = id;
00149 do_cleanup = true;
00150 }
00151
00152 void cancelCleanUp()
00153 {
00154 do_cleanup = false;
00155 }
00156
00157 };
00158
00159 MSBackupInfo *MSBackupInfo::startDump(MSDatabase *db, uint32_t cloud_ref, uint32_t backup_no)
00160 {
00161 MSBackupInfo *info;
00162 uint32_t ref_id;
00163 StartDumpCleanUp *cleanup;
00164
00165 enter_();
00166 push_(db);
00167 lock_(gBackupInfo);
00168
00169 ref_id = gMaxInfoRef++;
00170 new_(info, MSBackupInfo(ref_id, db->myDatabaseName->getCString(), db->myDatabaseID, time(NULL), 0, true, NULL, cloud_ref, backup_no));
00171 push_(info);
00172
00173 gBackupInfo->set(ref_id, RETAIN(info));
00174
00175 info->isRunning = true;
00176
00177 pop_(info);
00178 unlock_(gBackupInfo);
00179 push_(info);
00180
00181
00182
00183 new_(cleanup, StartDumpCleanUp());
00184 push_(cleanup);
00185 cleanup->setCleanUp(ref_id);
00186
00187 MSBackupTable::saveTable(RETAIN(db));
00188
00189 cleanup->cancelCleanUp();
00190 release_(cleanup);
00191
00192 pop_(info);
00193 release_(db);
00194
00195 return_(info);
00196 }
00197
00198 void MSBackupInfo::backupCompleted(MSDatabase *db)
00199 {
00200 completionTime = time(NULL);
00201 isRunning = false;
00202 MSBackupTable::saveTable(db);
00203 }
00204
00205
00206 void MSBackupInfo::backupTerminated(MSDatabase *db)
00207 {
00208 enter_();
00209 push_(db);
00210 lock_(gBackupInfo);
00211
00212 gBackupInfo->remove(backupRefId);
00213 unlock_(gBackupInfo);
00214
00215 pop_(db);
00216 MSBackupTable::saveTable(db);
00217 exit_();
00218 }
00219
00220
00221 MSBackup::MSBackup():
00222 CSDaemon(NULL),
00223 bu_info(NULL),
00224 bu_BackupList(NULL),
00225 bu_Compactor(NULL),
00226 bu_BackupRunning(false),
00227 bu_State(BU_COMPLETED),
00228 bu_SourceDatabase(NULL),
00229 bu_Database(NULL),
00230 bu_dst_dump(NULL),
00231 bu_src_dump(NULL),
00232 bu_size(0),
00233 bu_completed(0),
00234 bu_ID(0),
00235 bu_start_time(0),
00236 bu_TransactionManagerSuspended(false)
00237 {
00238 }
00239
00240 MSBackup *MSBackup::newMSBackup(MSBackupInfo *info)
00241 {
00242 MSBackup *bu;
00243 enter_();
00244
00245 push_(info);
00246
00247 new_(bu, MSBackup());
00248 push_(bu);
00249 bu->bu_Database = MSDatabase::getBackupDatabase(RETAIN(info->backupLocation), RETAIN(info->db_name), info->db_id, true);
00250 pop_(bu);
00251
00252 bu->bu_info = info;
00253 pop_(info);
00254
00255 return_(bu);
00256 }
00257
00258
00259 class StartBackupCleanUp : public CSRefObject {
00260 bool do_cleanup;
00261 MSBackup *backup;
00262
00263 public:
00264
00265 StartBackupCleanUp(): CSRefObject(),
00266 do_cleanup(false){}
00267
00268 ~StartBackupCleanUp()
00269 {
00270 if (do_cleanup) {
00271 backup->completeBackup();
00272 }
00273 }
00274
00275 void setCleanUp(MSBackup *bup)
00276 {
00277 backup = bup;
00278 do_cleanup = true;
00279 }
00280
00281 void cancelCleanUp()
00282 {
00283 do_cleanup = false;
00284 }
00285
00286 };
00287
00288 void MSBackup::startBackup(MSDatabase *src_db)
00289 {
00290 CSSyncVector *repo_list;
00291 bool compacting = false;
00292 MSRepository *repo;
00293 StartBackupCleanUp *cleanup;
00294 enter_();
00295
00296
00297
00298 new_(cleanup, StartBackupCleanUp());
00299 push_(cleanup);
00300 cleanup->setCleanUp(this);
00301
00302 bu_SourceDatabase = src_db;
00303 repo_list = bu_SourceDatabase->getRepositoryList();
00304
00305 bu_Compactor = bu_SourceDatabase->getCompactorThread();
00306 if (bu_Compactor) {
00307 bu_Compactor->retain();
00308 bu_Compactor->suspend();
00309 }
00310
00311
00312 lock_(repo_list);
00313
00314 new_(bu_BackupList, CSVector(repo_list->size()));
00315 for (uint32_t i = 0; i<repo_list->size(); i++) {
00316 if ((repo = (MSRepository *) repo_list->get(i))) {
00317 if (!repo->isRemovingFP && !repo->mustBeDeleted) {
00318 bu_BackupList->add(RETAIN(repo));
00319 if (repo->initBackup() == REPO_COMPACTING)
00320 compacting = true;
00321
00322 if (!repo->myRepoHeadSize) {
00323
00324
00325
00326 MSRepoFile *repo_file;
00327
00328
00329
00330
00331 repo_file = repo->openRepoFile();
00332 repo_file->release();
00333
00334
00335
00336 }
00337
00338 bu_size += repo->myRepoFileSize;
00339
00340 }
00341 }
00342 }
00343
00344
00345 uint32_t next_tab = 0;
00346 MSTable *tab;
00347 while ((tab = bu_SourceDatabase->getNextTable(&next_tab))) {
00348 push_(tab);
00349 bu_Database->addTable(tab->myTableID, tab->myTableName->getCString(), 0, false);
00350 release_(tab);
00351 }
00352 unlock_(repo_list);
00353
00354
00355 PBMSSystemTables::transferSystemTables(RETAIN(bu_Database), RETAIN(bu_SourceDatabase));
00356
00357
00358
00359 PBMSSystemTables::loadSystemTables(RETAIN(bu_Database));
00360
00361
00362 bu_Database->myBlobCloud->cl_setBackupInfo(RETAIN(bu_info));
00363
00364
00365
00366
00367
00368
00369
00370 char value[20];
00371 snprintf(value, 20, "%"PRIu32"", bu_info->getBackupRefId());
00372 MSVariableTable::setVariable(RETAIN(bu_Database), BACKUP_NUMBER_VAR, value);
00373
00374
00375
00376
00377 if (bu_Compactor && !compacting) {
00378 bu_Compactor->resume();
00379 bu_Compactor->release();
00380 bu_Compactor = NULL;
00381 }
00382
00383
00384 MSTransactionManager::suspend(true);
00385 bu_TransactionManagerSuspended = true;
00386
00387
00388 bu_ID = bu_start_time = time(NULL);
00389 start();
00390
00391 cleanup->cancelCleanUp();
00392 release_(cleanup);
00393
00394 exit_();
00395 }
00396
00397 void MSBackup::completeBackup()
00398 {
00399 if (bu_TransactionManagerSuspended) {
00400 MSTransactionManager::resume();
00401 bu_TransactionManagerSuspended = false;
00402 }
00403
00404 if (bu_BackupList) {
00405 MSRepository *repo;
00406
00407 while (bu_BackupList->size()) {
00408 repo = (MSRepository *) bu_BackupList->take(0);
00409 if (repo) {
00410 repo->backupCompleted();
00411 repo->release();
00412 }
00413 }
00414 bu_BackupList->release();
00415 bu_BackupList = NULL;
00416 }
00417
00418 if (bu_Compactor) {
00419 bu_Compactor->resume();
00420 bu_Compactor->release();
00421 bu_Compactor = NULL;
00422 }
00423
00424 if (bu_Database) {
00425 if (bu_State == BU_COMPLETED)
00426 bu_Database->releaseBackupDatabase();
00427 else
00428 MSDatabase::dropDatabase(bu_Database);
00429
00430 bu_Database = NULL;
00431 }
00432
00433 if (bu_SourceDatabase){
00434 if (bu_State == BU_COMPLETED)
00435 bu_info->backupCompleted(bu_SourceDatabase);
00436 else
00437 bu_info->backupTerminated(bu_SourceDatabase);
00438
00439 bu_SourceDatabase = NULL;
00440 bu_info->release();
00441 bu_info = NULL;
00442 }
00443
00444 bu_BackupRunning = false;
00445 }
00446
00447 bool MSBackup::doWork()
00448 {
00449 enter_();
00450 try_(a) {
00451 CSMutex *my_lock;
00452 MSRepository *src_repo, *dst_repo;
00453 MSRepoFile *src_file, *dst_file;
00454 off64_t src_offset, prev_offset;
00455 uint16_t head_size;
00456 uint64_t blob_size, blob_data_size;
00457 CSStringBuffer *head;
00458 MSRepoPointersRec ptr;
00459 uint32_t table_ref_count;
00460 uint32_t blob_ref_count;
00461 int ref_count;
00462 size_t ref_size;
00463 uint32_t auth_code;
00464 uint32_t tab_id;
00465 uint64_t blob_id;
00466 MSOpenTable *otab;
00467 uint32_t src_repo_id;
00468 uint8_t status;
00469 uint8_t blob_storage_type;
00470 uint16_t tab_index;
00471 uint32_t mod_time;
00472 char *transferBuffer;
00473 CloudKeyRec cloud_key;
00474
00475
00476 bu_BackupRunning = true;
00477 bu_State = BU_RUNNING;
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489 transferBuffer = (char*) cs_malloc(MS_BACKUP_BUFFER_SIZE);
00490 push_ptr_(transferBuffer);
00491
00492 new_(head, CSStringBuffer(100));
00493 push_(head);
00494
00495 src_repo = (MSRepository*)bu_BackupList->get(0);
00496 while (src_repo && !myMustQuit) {
00497 src_offset = 0;
00498 src_file = src_repo->openRepoFile();
00499 push_(src_file);
00500
00501 dst_repo = bu_Database->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
00502 frompool_(dst_repo);
00503 dst_file = dst_repo->openRepoFile();
00504 push_(dst_file);
00505
00506 src_repo_id = src_repo->myRepoID;
00507 src_offset = src_repo->myRepoHeadSize;
00508 prev_offset = 0;
00509 while (src_offset < src_repo->myRepoFileSize) {
00510 retry_read:
00511
00512 bu_completed += src_offset - prev_offset;
00513 prev_offset = src_offset;
00514 suspended();
00515
00516 if (myMustQuit)
00517 break;
00518
00519
00520
00521
00522 my_lock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
00523 lock_(my_lock);
00524 head->setLength(src_repo->myRepoBlobHeadSize);
00525 if (src_file->read(head->getBuffer(0), src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
00526 unlock_(my_lock);
00527 break;
00528 }
00529
00530 ptr.rp_chars = head->getBuffer(0);
00531 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00532 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00533 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00534 blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
00535 blob_data_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_data_size_6);
00536 auth_code = CS_GET_DISK_4(ptr.rp_head->rb_auth_code_4);
00537 status = CS_GET_DISK_1(ptr.rp_head->rb_status_1);
00538 mod_time = CS_GET_DISK_4(ptr.rp_head->rb_mod_time_4);
00539
00540 blob_storage_type = CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1);
00541 if (blob_storage_type == MS_CLOUD_STORAGE) {
00542 MSRepoFile::getBlobKey(ptr.rp_head, &cloud_key);
00543 }
00544
00545
00546
00547
00548 if (mod_time > bu_start_time)
00549 CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, bu_start_time);
00550
00551
00552
00553 if ((status == MS_BLOB_MOVED) && (bu_ID == (uint32_t) CS_GET_DISK_4(ptr.rp_head->rb_backup_id_4))) {
00554 status = MS_BLOB_REFERENCED;
00555 CS_SET_DISK_1(ptr.rp_head->rb_status_1, status);
00556 }
00557
00558
00559 if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
00560 head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
00561 !VALID_BLOB_STATUS(status)) {
00562
00563 src_offset++;
00564 unlock_(my_lock);
00565 continue;
00566 }
00567
00568
00569 if ((status == MS_BLOB_REFERENCED) || (status == MS_BLOB_MOVED)) {
00570 head->setLength(head_size);
00571 if (src_file->read(head->getBuffer(0) + src_repo->myRepoBlobHeadSize, src_offset + src_repo->myRepoBlobHeadSize, head_size - src_repo->myRepoBlobHeadSize, 0) != (head_size- src_repo->myRepoBlobHeadSize)) {
00572 unlock_(my_lock);
00573 break;
00574 }
00575
00576 table_ref_count = 0;
00577 blob_ref_count = 0;
00578
00579
00580
00581
00582 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00583 for (int count = 0; count < ref_count; count++) {
00584 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00585 case MS_BLOB_FREE_REF:
00586 break;
00587 case MS_BLOB_TABLE_REF:
00588
00589
00590 table_ref_count++;
00591 break;
00592 case MS_BLOB_DELETE_REF:
00593
00594
00595 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00596 break;
00597 default:
00598
00599
00600 tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
00601 if (tab_index && (tab_index <= ref_count)) {
00602
00603 if (IS_COMMITTED(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8))) {
00604 MSRepoTableRefPtr tab_ref;
00605 tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (tab_index-1) * ref_size);
00606 if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
00607 blob_ref_count++;
00608 } else {
00609 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00610 }
00611
00612 } else {
00613
00614 src_offset++;
00615 unlock_(my_lock);
00616 goto retry_read;
00617 }
00618 break;
00619 }
00620 ptr.rp_chars += ref_size;
00621 }
00622
00623
00624
00625 if (table_ref_count && blob_ref_count) {
00626
00627 off64_t dst_offset;
00628
00629 dst_offset = dst_repo->myRepoFileSize;
00630
00631
00632 dst_file->write(head->getBuffer(0), dst_offset, head_size);
00633
00634
00635 if (blob_storage_type == MS_CLOUD_STORAGE) {
00636 bu_Database->myBlobCloud->cl_backupBLOB(&cloud_key);
00637 } else
00638 CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, transferBuffer, MS_BACKUP_BUFFER_SIZE);
00639
00640
00641 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00642 for (int count = 0; count < ref_count; count++) {
00643 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00644 case MS_BLOB_FREE_REF:
00645 case MS_BLOB_DELETE_REF:
00646 break;
00647 case MS_BLOB_TABLE_REF:
00648 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00649 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00650
00651 if ((otab = MSTableList::getOpenTableByID(bu_Database->myDatabaseID, tab_id))) {
00652 frompool_(otab);
00653 otab->getDBTable()->setBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, blob_size, head_size, auth_code);
00654
00655
00656 backtopool_(otab);
00657 }
00658 break;
00659 default:
00660 break;
00661 }
00662 ptr.rp_chars += ref_size;
00663 }
00664
00665 dst_repo->myRepoFileSize += head_size + blob_size;
00666 }
00667 }
00668 unlock_(my_lock);
00669 src_offset += head_size + blob_size;
00670 }
00671 bu_completed += src_offset - prev_offset;
00672
00673
00674 release_(dst_file);
00675 backtopool_(dst_repo);
00676 release_(src_file);
00677
00678
00679 src_repo->backupCompleted();
00680 bu_BackupList->remove(0);
00681
00682 src_repo = (MSRepository*)bu_BackupList->get(0);
00683 }
00684
00685 release_(head);
00686 release_(transferBuffer);
00687 if (myMustQuit)
00688 bu_State = BU_TERMINATED;
00689 else
00690 bu_State = BU_COMPLETED;
00691
00692 }
00693
00694 catch_(a) {
00695 logException();
00696 }
00697
00698 cont_(a);
00699 completeBackup();
00700 myMustQuit = true;
00701 return_(true);
00702 }
00703
00704 void *MSBackup::completeWork()
00705 {
00706 if (bu_SourceDatabase || bu_BackupList || bu_Compactor || bu_info) {
00707
00708 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "MSBackup::completeBackup() not called");
00709 if (bu_SourceDatabase) {
00710 bu_SourceDatabase->release();
00711 bu_SourceDatabase = NULL;
00712 }
00713
00714 if (bu_BackupList) {
00715 bu_BackupList->release();
00716 bu_BackupList = NULL;
00717 }
00718
00719
00720 if (bu_Compactor) {
00721 bu_Compactor->release();
00722 bu_Compactor = NULL;
00723 }
00724
00725
00726 if (bu_info) {
00727 bu_info->release();
00728 bu_info = NULL;
00729 }
00730
00731 }
00732 return NULL;
00733 }