Drizzled Public API Documentation

open_table_ms.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-06-04
00023  *
00024  * H&G2JCtL
00025  *
00026  * Media Stream Tables.
00027  *
00028  */
00029 #include "cslib/CSConfig.h"
00030 
00031 #include "defs_ms.h"
00032 
00033 #include "cslib/CSGlobal.h"
00034 #include "cslib/CSLog.h"
00035 #include "cslib/CSStrUtil.h"
00036 #include "cslib/CSPath.h"
00037 
00038 #include "open_table_ms.h"
00039 #include "table_ms.h"
00040 #include "connection_handler_ms.h"
00041 #include "engine_ms.h"
00042 #include "transaction_ms.h"
00043 #include "parameters_ms.h"
00044 
00045 /*
00046  * ---------------------------------------------------------------
00047  * OPEN TABLES
00048  */
00049 
00050 MSOpenTable::MSOpenTable():
00051 CSRefObject(),
00052 CSPooled(),
00053 inUse(true),
00054 isNotATable(false),
00055 nextTable(NULL),
00056 myPool(NULL),
00057 myTableFile(NULL),
00058 myWriteRepo(NULL),
00059 myWriteRepoFile(NULL),
00060 myTempLogFile(NULL),
00061 iNextLink(NULL),
00062 iPrevLink(NULL)
00063 //iUseSize(0),
00064 //iUseCount(0),
00065 //iUsedBlobs(0)
00066 {
00067   memset(myOTBuffer, 0, MS_OT_BUFFER_SIZE); // wipe this to make valgrind happy.
00068 }
00069 
00070 MSOpenTable::~MSOpenTable()
00071 {
00072   close();
00073 }
00074 
00075 void MSOpenTable::close()
00076 {
00077   enter_();
00078   if (myTableFile) {
00079     myTableFile->release();
00080     myTableFile = NULL;
00081   }
00082   closeForWriting();
00083   if (myTempLogFile) {
00084     myTempLogFile->release();
00085     myTempLogFile = NULL;
00086   }
00087 /*
00088   if (iUsedBlobs) {
00089     cs_free(iUsedBlobs);
00090     iUsedBlobs = NULL;
00091   }
00092   iUseCount = 0;
00093   iUseSize = 0;
00094 */
00095   exit_();
00096 }
00097 
00098 void MSOpenTable::returnToPool()
00099 {
00100   MSTableList::releaseTable(this);
00101 }
00102 
00103 // This cleanup class is used to reset the 
00104 // repository size if something goes wrong.
00105 class CreateBlobCleanUp : public CSRefObject {
00106   bool do_cleanup;
00107   uint64_t old_size;
00108   MSOpenTable *ot;
00109   MSRepository *repo;
00110 
00111   public:
00112   
00113   CreateBlobCleanUp(): CSRefObject(),
00114     do_cleanup(false){}
00115     
00116   ~CreateBlobCleanUp() 
00117   {
00118     if (do_cleanup) {
00119       repo->setRepoFileSize(ot, old_size);
00120 
00121     }
00122   }
00123   
00124   void setCleanUp(MSOpenTable *ot_arg, MSRepository *repo_arg, uint64_t size)
00125   {
00126     old_size = size;
00127     repo = repo_arg;
00128     ot = ot_arg;
00129     do_cleanup = true;
00130   }
00131   
00132   void cancelCleanUp()
00133   {
00134     do_cleanup = false;
00135   }
00136   
00137 };
00138 
00139 void MSOpenTable::createBlob(PBMSBlobURLPtr bh, uint64_t blob_size, char *metadata, uint16_t metadata_size, CSInputStream *stream, CloudKeyPtr cloud_key, Md5Digest *checksum)
00140 {
00141   uint64_t repo_offset;
00142   uint64_t blob_id = 0;
00143   uint32_t  auth_code;
00144   uint16_t head_size;
00145   uint32_t  log_id;
00146   uint32_t log_offset;
00147   uint32_t temp_time;
00148   uint64_t repo_size;
00149   uint64_t repo_id;
00150   Md5Digest my_checksum;
00151   CloudKeyRec cloud_key_rec;
00152   CreateBlobCleanUp *cleanup;
00153   enter_();
00154   
00155   new_(cleanup, CreateBlobCleanUp());
00156   push_(cleanup);
00157   
00158   if (!checksum)
00159     checksum = &my_checksum;
00160     
00161   if (stream) push_(stream);
00162   openForWriting();
00163   ASSERT(myWriteRepo);
00164   auth_code = random();
00165   repo_size = myWriteRepo->getRepoFileSize();
00166   temp_time = myWriteRepo->myLastTempTime;
00167 
00168   // If an exception occurs the cleanup operation will be called.
00169   cleanup->setCleanUp(this, myWriteRepo, repo_size);
00170 
00171   head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
00172   if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
00173     pop_(stream);
00174     repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
00175   } else {
00176     ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
00177     CloudDB *cloud = getDB()->myBlobCloud;
00178     
00179     if (!cloud)
00180       CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
00181   
00182     repo_offset = repo_size + head_size;
00183     memset(checksum, 0, sizeof(Md5Digest)); // The checksum is only for local storage.
00184     
00185     // If there is a stream then the data has not been sent to the cloud yet.
00186     if (stream) { 
00187       cloud_key = &cloud_key_rec;
00188       cloud->cl_getNewKey(cloud_key);
00189       pop_(stream);
00190       cloud->cl_putData(cloud_key, stream, blob_size);
00191     }
00192     
00193   }
00194   
00195   repo_id = myWriteRepo->myRepoID;
00196   if (isNotATable) {  
00197     getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
00198     formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
00199   }
00200   else {
00201     blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
00202     getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
00203     formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
00204   }
00205   
00206   myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
00207   
00208   cleanup->cancelCleanUp();
00209   release_(cleanup);
00210   
00211   exit_();
00212 }
00213 
00214 // BLOBs created with this method are always created as standard local BLOBs. (No cloud storage)
00215 void MSOpenTable::createBlob(PBMSBlobIDPtr blob_id, uint64_t blob_size, char *metadata, uint16_t metadata_size)
00216 {
00217   uint64_t repo_size;
00218   uint64_t repo_offset;
00219   uint64_t repo_id;
00220   uint32_t  auth_code;
00221   uint16_t head_size;
00222   uint32_t  log_id;
00223   uint32_t log_offset;
00224   uint32_t temp_time;
00225   CreateBlobCleanUp *cleanup;
00226   enter_();
00227   
00228   new_(cleanup, CreateBlobCleanUp());
00229   push_(cleanup);
00230 
00231   openForWriting();
00232   ASSERT(myWriteRepo);
00233   auth_code = random();
00234   
00235   repo_size = myWriteRepo->getRepoFileSize();
00236   
00237   // If an exception occurs the cleanup operation will be called.
00238   cleanup->setCleanUp(this, myWriteRepo, repo_size);
00239 
00240   head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
00241 
00242   repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
00243   repo_id = myWriteRepo->myRepoID;
00244   temp_time = myWriteRepo->myLastTempTime;
00245   getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
00246   myWriteRepo->myLastTempTime = temp_time;
00247   myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
00248   // myWriteRepo->setRepoFileSize(this, repo_offset + head_size + blob_size);This is now set by writeBlobHead()
00249   
00250   blob_id->bi_db_id = getDB()->myDatabaseID;
00251   blob_id->bi_blob_id = repo_offset;
00252   blob_id->bi_tab_id = repo_id;
00253   blob_id->bi_auth_code = auth_code;
00254   blob_id->bi_blob_size = blob_size;
00255   blob_id->bi_blob_type = MS_URL_TYPE_REPO;
00256   blob_id->bi_blob_ref_id = 0;
00257   
00258   cleanup->cancelCleanUp();
00259   release_(cleanup);
00260 
00261   exit_();
00262 }
00263 
00264 void MSOpenTable::sendRepoBlob(uint64_t blob_id, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool info_only, CSHTTPOutputStream *stream)
00265 {
00266   uint32_t    repo_id;
00267   uint64_t    offset;
00268   uint64_t    size;
00269   uint16_t    head_size;
00270   MSRepoFile  *repo_file;
00271 
00272   enter_();
00273   openForReading();
00274   getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, true);
00275   repo_file = getDB()->getRepoFileFromPool(repo_id, false);
00276   frompool_(repo_file);
00277   //repo_file->sendBlob(this, offset, head_size, size, stream);
00278   repo_file->sendBlob(this, offset, req_offset, req_size, 0, false, info_only, stream);
00279   backtopool_(repo_file);
00280   exit_();
00281 }
00282 
00283 void MSOpenTable::freeReference(uint64_t blob_id, uint64_t blob_ref_id)
00284 {
00285   uint32_t    repo_id;
00286   uint64_t    offset;
00287   uint64_t    blob_size;
00288   uint16_t    head_size;
00289   MSRepoFile  *repo_file;
00290   uint32_t    auth_code = 0;
00291 
00292   enter_();
00293   openForReading();
00294 
00295   getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
00296   repo_file = getDB()->getRepoFileFromPool(repo_id, false);
00297 
00298   frompool_(repo_file);
00299   repo_file->releaseBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
00300   backtopool_(repo_file);
00301 
00302   exit_();
00303 }
00304 
00305 void MSOpenTable::commitReference(uint64_t blob_id, uint64_t blob_ref_id)
00306 {
00307   uint32_t    repo_id;
00308   uint64_t    offset;
00309   uint64_t    blob_size;
00310   uint16_t    head_size;
00311   MSRepoFile  *repo_file;
00312   uint32_t    auth_code = 0;
00313 
00314   enter_();
00315   openForReading();
00316   
00317   getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
00318   repo_file = getDB()->getRepoFileFromPool(repo_id, false);
00319 
00320   frompool_(repo_file);
00321   repo_file->commitBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
00322   backtopool_(repo_file);
00323 
00324   exit_();
00325 }
00326 
00327 void MSOpenTable::useBlob(int type, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code, uint16_t col_index, uint64_t blob_size, uint64_t blob_ref_id, PBMSBlobURLPtr ret_blob_url)
00328 {
00329   MSRepoFile    *repo_file= NULL;
00330   MSBlobHeadRec blob;
00331   CSInputStream *stream;
00332   MSDatabase    *blob_db;
00333   int       state;
00334   uint16_t      head_size;
00335   uint64_t      repo_offset;
00336   uint32_t      repo_id;
00337 
00338   enter_();
00339 
00340   blob_db = getDB();
00341     
00342   if (!blob_db->isRecovering()) {
00343     // During recovery the only thing that needs to be done is to 
00344     // reset the database ID which is done when the URL is created.
00345     // Create the URL using the table ID passed in not the one from 
00346     // the table associated with this object.
00347 
00348     openForReading();
00349     if (type == MS_URL_TYPE_REPO) { // There is no table reference associated with this BLOB yet.
00350       uint32_t    ac;
00351       uint8_t   status;
00352       bool    same_db = true;
00353 
00354       if (blob_db->myDatabaseID == db_id)
00355         repo_file = blob_db->getRepoFileFromPool(tab_id, false);
00356       else {
00357         same_db = false;
00358         blob_db = MSDatabase::getDatabase(db_id);
00359         push_(blob_db);
00360         repo_file = blob_db->getRepoFileFromPool(tab_id, false);
00361         release_(blob_db);
00362         blob_db = repo_file->myRepo->myRepoDatabase;
00363       }
00364     
00365       frompool_(repo_file);
00366       repo_file->read(&blob, blob_id, MS_MIN_BLOB_HEAD_SIZE, MS_MIN_BLOB_HEAD_SIZE);
00367 
00368       repo_offset = blob_id;
00369       blob_size  = CS_GET_DISK_6(blob.rb_blob_data_size_6);
00370       head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00371        
00372       ac = CS_GET_DISK_4(blob.rb_auth_code_4);
00373       if (auth_code != ac)
00374         CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00375       status = CS_GET_DISK_1(blob.rb_status_1);
00376       if ( ! IN_USE_BLOB_STATUS(status))
00377         CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00378 
00379       if (same_db) {
00380         // Create a table reference to the BLOB:
00381         repo_id = tab_id;
00382         blob_id = getDBTable()->createBlobHandle(this, tab_id, blob_id, blob_size, head_size, auth_code);
00383         state = MS_UB_NEW_HANDLE;
00384       }
00385       else {
00386         
00387         getDB()->openWriteRepo(this);
00388 
00389         // If either databases are using cloud storage then this is
00390         // not supported yet.     
00391         if (getDB()->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
00392           CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
00393       
00394         stream = repo_file->getInputStream(repo_offset);
00395         push_(stream);
00396         repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);     
00397         release_(stream);
00398 
00399         // Create a table reference to the BLOB:
00400         repo_id = myWriteRepo->myRepoID;
00401         blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
00402         state = MS_UB_NEW_BLOB;
00403       }
00404       backtopool_(repo_file);
00405     }
00406     else {
00407 
00408       if (blob_db->myDatabaseID == db_id && getDBTable()->myTableID == tab_id) {
00409         getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
00410         
00411         state = MS_UB_SAME_TAB;
00412       }
00413       else {
00414         MSOpenTable *blob_otab;
00415 
00416         blob_otab = MSTableList::getOpenTableByID(db_id, tab_id);
00417         frompool_(blob_otab);
00418         blob_otab->getDBTable()->readBlobHandle(blob_otab, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
00419         if (blob_db->myDatabaseID == db_id) {
00420           blob_id = getDBTable()->findBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
00421           if (blob_id == 0)
00422             blob_id = getDBTable()->createBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
00423           state = MS_UB_NEW_HANDLE;
00424         }
00425         else {
00426 
00427           // If either databases are using cloud storage then this is
00428           // not supported yet.     
00429           if (blob_db->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
00430             CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
00431 
00432           // NOTE: For each BLOB reference copied from one database to another a new
00433           // BLOB will be created. This can result in multiple copies fo the same BLOB
00434           // in the destination database. One way around this would be to redisign things
00435           // so that there is one BLOB repository shared across all databases. 
00436           blob_db->openWriteRepo(this);
00437                   
00438           stream = repo_file->getInputStream(repo_offset);
00439           push_(stream);
00440           
00441           repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);
00442           
00443           release_(stream);
00444 
00445           repo_id = myWriteRepo->myRepoID;
00446           blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
00447           state = MS_UB_NEW_BLOB;
00448         }
00449         backtopool_(blob_otab);
00450       }
00451       
00452     }
00453     
00454     blob_ref_id = blob_db->newBlobRefId();
00455     
00456     // Always use the table ID of this table because regardless of
00457     // where the BLOB ref came from it is being inserted into this table.
00458     tab_id = getDBTable()->myTableID; 
00459     
00460     // Add the BLOB reference to the repository.
00461     repo_file = blob_db->getRepoFileFromPool(repo_id, false);   
00462     frompool_(repo_file);
00463     repo_file->referenceBlob(this, repo_offset, head_size, tab_id, blob_id, blob_ref_id, auth_code, col_index);   
00464     backtopool_(repo_file);
00465     
00466     MSTransactionManager::referenceBLOB(getDB()->myDatabaseID, tab_id, blob_id, blob_ref_id);
00467 
00468   } 
00469   
00470   formatBlobURL(ret_blob_url, blob_id, auth_code, blob_size, tab_id, blob_ref_id);
00471     
00472   exit_();
00473 }
00474 
00475 void MSOpenTable::releaseReference(uint64_t blob_id, uint64_t blob_ref_id)
00476 {
00477   enter_();
00478   
00479   MSTransactionManager::dereferenceBLOB(getDB()->myDatabaseID, getDBTable()->myTableID, blob_id, blob_ref_id);
00480 
00481   exit_();
00482 }
00483 
00484 void MSOpenTable::checkBlob(CSStringBuffer *buffer, uint64_t blob_id, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
00485 {
00486   uint32_t    repo_id;
00487   uint64_t    offset;
00488   uint64_t    size;
00489   uint16_t    head_size;
00490   MSRepoFile  *repo_file;
00491 
00492   enter_();
00493   openForReading();
00494   if (getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, false)) {
00495     if ((repo_file = getDB()->getRepoFileFromPool(repo_id, true))) {
00496       frompool_(repo_file);
00497       repo_file->checkBlob(buffer, offset, auth_code, temp_log_id, temp_log_offset);
00498       backtopool_(repo_file);
00499     }
00500     else
00501       getDBTable()->freeBlobHandle(this, blob_id, repo_id, offset, auth_code);
00502   }
00503   exit_();
00504 }
00505 
00506 bool MSOpenTable::deleteReferences(uint32_t temp_log_id, uint32_t temp_log_offset, bool *must_quit)
00507 {
00508   MSTableHeadRec    tab_head;
00509   off64_t       blob_id;
00510   MSTableBlobRec    tab_blob;
00511   uint32_t        repo_id;
00512   uint64_t        repo_offset;
00513   uint16_t        head_size;
00514   uint32_t        auth_code;
00515   MSRepoFile      *repo_file = NULL;
00516   bool        result = true;
00517 
00518   enter_();
00519   openForReading();
00520   if (myTableFile->read(&tab_head, 0, offsetof(MSTableHeadRec, th_reserved_4), 0) < offsetof(MSTableHeadRec, th_reserved_4))
00521     /* Nothing to read, delete it ... */
00522     goto exit;
00523   if (CS_GET_DISK_4(tab_head.th_temp_log_id_4) != temp_log_id ||
00524     CS_GET_DISK_4(tab_head.th_temp_log_offset_4) != temp_log_offset) {
00525     /* Wrong delete reference (ignore): */
00526     result = false;
00527     goto exit;
00528   }
00529 
00530   blob_id = CS_GET_DISK_2(tab_head.th_head_size_2);
00531   while (blob_id + sizeof(MSTableBlobRec) <= getDBTable()->getTableFileSize()) {
00532     if (*must_quit) {
00533       /* Bit of a waste of work, but we must quit! */
00534       result = false;
00535       break;
00536     }
00537     if (myTableFile->read(&tab_blob, blob_id, sizeof(MSTableBlobRec), 0) < sizeof(MSTableBlobRec))
00538       break;
00539     repo_id = CS_GET_DISK_3(tab_blob.tb_repo_id_3);
00540     repo_offset = CS_GET_DISK_6(tab_blob.tb_offset_6);
00541     head_size = CS_GET_DISK_2(tab_blob.tb_header_size_2);
00542     auth_code = CS_GET_DISK_4(tab_blob.tb_auth_code_4);
00543     if (repo_file && repo_file->myRepo->myRepoID != repo_id) {
00544       backtopool_(repo_file);
00545       repo_file = NULL;
00546     }
00547     if (!repo_file) {
00548       repo_file = getDB()->getRepoFileFromPool(repo_id, true);
00549       if (repo_file)
00550         frompool_(repo_file);
00551     }
00552     if (repo_file) 
00553       repo_file->freeTableReference(this, repo_offset, head_size, getDBTable()->myTableID, blob_id, auth_code);
00554     
00555     blob_id += sizeof(MSTableBlobRec);
00556   }
00557   
00558   if (repo_file)
00559     backtopool_(repo_file);
00560 
00561   exit:
00562   return_(result);
00563 }
00564 
00565 void MSOpenTable::openForReading()
00566 {
00567   if (!myTableFile && !isNotATable)
00568     myTableFile = getDBTable()->openTableFile();
00569 }
00570 
00571 void MSOpenTable::openForWriting()
00572 {
00573   if (myTableFile && myWriteRepo && myWriteRepoFile)
00574     return;
00575   enter_();
00576   openForReading();
00577   if (!myWriteRepo || !myWriteRepoFile)
00578     getDB()->openWriteRepo(this);
00579   exit_();
00580 }
00581 
00582 void MSOpenTable::closeForWriting()
00583 {
00584   if (myWriteRepoFile) {    
00585     myWriteRepoFile->myRepo->syncHead(myWriteRepoFile);
00586     myWriteRepoFile->release();
00587     myWriteRepoFile = NULL;
00588   }
00589   if (myWriteRepo) {
00590     myWriteRepo->unlockRepo(REPO_WRITE);
00591 #ifndef MS_COMPACTOR_POLLS
00592     if (myWriteRepo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
00593       if (myWriteRepo->myRepoDatabase->myCompactorThread)
00594         myWriteRepo->myRepoDatabase->myCompactorThread->wakeup();
00595     }
00596 #endif
00597     myWriteRepo->release();
00598     myWriteRepo = NULL;
00599   }
00600 }
00601 
00602 uint32_t MSOpenTable::getTableID()
00603 {
00604   return myPool->myPoolTable->myTableID;
00605 }
00606 
00607 MSTable *MSOpenTable::getDBTable()
00608 {
00609   return myPool->myPoolTable;
00610 }
00611 
00612 MSDatabase *MSOpenTable::getDB()
00613 {
00614   return myPool->myPoolDB;
00615 }
00616 
00617 void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint32_t tab_id, uint64_t blob_ref_id)
00618 {
00619   MSBlobURLRec blob;
00620   
00621   blob.bu_type = MS_URL_TYPE_BLOB;
00622   blob.bu_db_id = getDB()->myDatabaseID;
00623   blob.bu_tab_id = tab_id;
00624   blob.bu_blob_id = blob_id;
00625   blob.bu_auth_code = auth_code;
00626   blob.bu_server_id = PBMSParameters::getServerID();
00627   blob.bu_blob_size = blob_size;
00628   blob.bu_blob_ref_id = blob_ref_id;
00629   
00630   PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
00631   
00632 }
00633 void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint64_t blob_ref_id)
00634 {
00635   formatBlobURL(blob_url, blob_id, auth_code, blob_size, getDBTable()->myTableID, blob_ref_id);
00636 }
00637 void MSOpenTable::formatRepoURL(PBMSBlobURLPtr blob_url, uint32_t log_id, uint64_t log_offset, uint32_t auth_code, uint64_t blob_size)
00638 {
00639   MSBlobURLRec blob;
00640   
00641   blob.bu_type = MS_URL_TYPE_REPO;
00642   blob.bu_db_id = getDB()->myDatabaseID;
00643   blob.bu_tab_id = log_id;
00644   blob.bu_blob_id = log_offset;
00645   blob.bu_auth_code = auth_code;
00646   blob.bu_server_id = PBMSParameters::getServerID();
00647   blob.bu_blob_size = blob_size;
00648   blob.bu_blob_ref_id = 0;
00649   
00650   PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
00651 }
00652 
00653 MSOpenTable *MSOpenTable::newOpenTable(MSOpenTablePool *pool)
00654 {
00655   MSOpenTable *otab;
00656   
00657   if (!(otab = new MSOpenTable()))
00658     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00659   if ((otab->myPool = pool))
00660     otab->isNotATable = pool->myPoolTable == NULL;
00661   else
00662     otab->isNotATable = false;
00663     
00664   return otab;
00665 }
00666 
00667 /*
00668  * ---------------------------------------------------------------
00669  * OPEN TABLE POOLS
00670  */
00671 
00672 MSOpenTablePool::MSOpenTablePool():
00673 myPoolTableID(0),
00674 isRemovingTP(false),
00675 myPoolTable(NULL),
00676 myPoolDB(NULL),
00677 iTablePool(NULL)
00678 {
00679 }
00680 
00681 MSOpenTablePool::~MSOpenTablePool()
00682 {
00683   isRemovingTP = true;
00684   removeOpenTablesNotInUse();
00685   /* With this, I also delete those that are in use!: */
00686   iPoolTables.clear();
00687   if (myPoolTable)
00688     myPoolTable->release();
00689   if (myPoolDB)
00690     myPoolDB->release();
00691 }
00692 
00693 #ifdef DEBUG
00694 void MSOpenTablePool::check()
00695 {
00696   MSOpenTable *otab, *ptab;
00697   bool    found;
00698 
00699   if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
00700     do {
00701       found = false;
00702       ptab = iTablePool;
00703       while (ptab) {
00704         if (ptab == otab) {
00705           ASSERT(!found);
00706           found = true;
00707         }
00708         ptab = ptab->nextTable;
00709       }
00710       if (otab->inUse) {
00711         ASSERT(!found);
00712       }
00713       else {
00714         ASSERT(found);
00715       }
00716       otab = (MSOpenTable *) otab->getNextLink();
00717     } while (otab);
00718   }
00719   else
00720     ASSERT(!iTablePool);
00721 }
00722 #endif
00723 
00724 /*
00725  * This returns the table referenced. So it is safe from the pool being
00726  * destroyed.
00727  */
00728 MSOpenTable *MSOpenTablePool::getPoolTable()
00729 {
00730   MSOpenTable *otab;
00731 
00732   if ((otab = iTablePool)) {
00733     iTablePool = otab->nextTable;
00734     otab->nextTable = NULL;
00735     ASSERT(!otab->inUse);
00736     otab->inUse = true;
00737     otab->retain();
00738   }
00739   return otab;
00740 }
00741 
00742 void MSOpenTablePool::returnOpenTable(MSOpenTable *otab)
00743 {
00744   otab->inUse = false;
00745   otab->nextTable = iTablePool;
00746   iTablePool = otab;
00747 }
00748 
00749 /*
00750  * Add a table to the pool, but do not release it!
00751  */
00752 void MSOpenTablePool::addOpenTable(MSOpenTable *otab)
00753 {
00754   iPoolTables.addFront(otab);
00755 }
00756 
00757 void MSOpenTablePool::removeOpenTable(MSOpenTable *otab)
00758 {
00759   otab->close();
00760   iPoolTables.remove(otab);
00761 }
00762 
00763 void MSOpenTablePool::removeOpenTablesNotInUse()
00764 {
00765   MSOpenTable *otab, *curr_otab;
00766 
00767   iTablePool = NULL;
00768   /* Remove all tables that are not in use: */
00769   if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
00770     do {
00771       curr_otab = otab;
00772       otab = (MSOpenTable *) otab->getNextLink();
00773       if (!curr_otab->inUse)
00774         iPoolTables.remove(curr_otab);
00775     } while (otab);
00776   }
00777 }
00778 
00779 void MSOpenTablePool::returnToPool()
00780 {
00781   MSTableList::removeTablePool(this);
00782 }
00783 
00784 MSOpenTablePool *MSOpenTablePool::newPool(uint32_t db_id, uint32_t tab_id)
00785 {
00786   MSOpenTablePool *pool;
00787   enter_();
00788   
00789   if (!(pool = new MSOpenTablePool())) {
00790     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00791   }
00792   push_(pool);
00793   pool->myPoolDB = MSDatabase::getDatabase(db_id);
00794   pool->myPoolTableID = tab_id;
00795   if (tab_id)
00796     pool->myPoolTable = pool->myPoolDB->getTable(tab_id, false);
00797   pop_(pool);
00798   return_(pool);
00799 }
00800 
00801 /*
00802  * ---------------------------------------------------------------
00803  * TABLE LIST
00804  */
00805 
00806 CSSyncOrderedList   *MSTableList::gPoolListByID;
00807 
00808 MSTableList::MSTableList()
00809 { 
00810 }
00811 
00812 MSTableList::~MSTableList()
00813 {
00814 }
00815 
00816 void MSTableList::startUp()
00817 {
00818   new_(gPoolListByID, CSSyncOrderedList);
00819 }
00820 
00821 void MSTableList::shutDown()
00822 {
00823   if (gPoolListByID) {
00824     gPoolListByID->clear();
00825     gPoolListByID->release();
00826     gPoolListByID = NULL;
00827   }
00828 }
00829 
00830 class MSTableKey : public CSOrderKey {
00831 public:
00832   uint32_t  myKeyDatabaseID;
00833   uint32_t  myKeyTableID;
00834 
00835   MSTableKey(): myKeyDatabaseID(0), myKeyTableID(0){ }
00836 
00837   virtual ~MSTableKey() {
00838   }
00839 
00840   int compareKey(CSObject *key) {return CSObject::compareKey(key);}
00841   virtual int compareKey(CSOrderKey *x) {
00842     MSTableKey  *key = (MSTableKey *) x;
00843     int     r = 0;
00844 
00845     if (myKeyDatabaseID < key->myKeyDatabaseID)
00846       r = -1;
00847     else if (myKeyDatabaseID > key->myKeyDatabaseID)
00848       r = 1;
00849       
00850     if (r == 0) {
00851       if (myKeyTableID < key->myKeyTableID)
00852         r = -1;
00853       else if (myKeyTableID > key->myKeyTableID)
00854         r = 1;
00855     }
00856     return r;
00857   }
00858 
00859 public:
00860   static MSTableKey *newTableKey(uint32_t db_id, uint32_t tab_id)
00861   {
00862     MSTableKey *key;
00863 
00864     if (!(key = new MSTableKey())) {
00865       CSException::throwOSError(CS_CONTEXT, ENOMEM);
00866     }
00867     key->myKeyDatabaseID = db_id;
00868     key->myKeyTableID = tab_id;
00869     return key;
00870   }
00871 };
00872 
00873 MSOpenTable *MSTableList::getOpenTableByID(uint32_t db_id, uint32_t tab_id)
00874 {
00875   MSOpenTablePool   *pool;
00876   MSOpenTable     *otab = NULL;
00877   MSTableKey      key;
00878 
00879   enter_();
00880   lock_(gPoolListByID);
00881   key.myKeyDatabaseID = db_id;
00882   key.myKeyTableID = tab_id;
00883   pool = (MSOpenTablePool *) gPoolListByID->find(&key);
00884   if (!pool) {
00885     MSTableKey  *key_ptr;
00886     pool = MSOpenTablePool::newPool(db_id, tab_id);
00887     key_ptr = MSTableKey::newTableKey(db_id, tab_id);
00888     gPoolListByID->add(key_ptr, pool);
00889   }
00890   if (!(otab = pool->getPoolTable())) {
00891     otab = MSOpenTable::newOpenTable(pool);
00892     pool->addOpenTable(otab);
00893     otab->retain();
00894   }
00895   unlock_(gPoolListByID);
00896   return_(otab);
00897 }
00898 
00899 MSOpenTable *MSTableList::getOpenTableForDB(uint32_t db_id)
00900 {
00901   return(MSTableList::getOpenTableByID(db_id, 0));
00902 }
00903 
00904 
00905 void MSTableList::releaseTable(MSOpenTable *otab)
00906 {
00907   MSOpenTablePool *pool;
00908 
00909   enter_();
00910   lock_(gPoolListByID);
00911   push_(otab);
00912   if ((pool = otab->myPool)) {
00913     if (pool->isRemovingTP) {
00914       pool->removeOpenTable(otab);
00915       gPoolListByID->wakeup();
00916     }
00917     else
00918       pool->returnOpenTable(otab);
00919   }
00920   release_(otab);
00921   unlock_(gPoolListByID);
00922   exit_();
00923 }
00924 
00925 bool MSTableList::removeTablePoolIfEmpty(MSOpenTablePool *pool)
00926 {
00927   enter_();
00928   if (pool->getSize() == 0) {
00929     MSTableKey  key;
00930     
00931     key.myKeyDatabaseID = pool->myPoolDB->myDatabaseID;
00932     key.myKeyTableID = pool->myPoolTableID;
00933     gPoolListByID->remove(&key);
00934     /* TODO: Remove the table from the database, if it does not exist
00935      * on disk.
00936      */
00937     return_(true);
00938   }
00939   return_(false);
00940 }
00941 
00942 void MSTableList::removeTablePool(MSOpenTablePool *pool)
00943 {
00944   enter_();
00945   lock_(gPoolListByID);
00946   for (;;) {
00947     pool->isRemovingTP = true;
00948     pool->removeOpenTablesNotInUse();
00949     if (removeTablePoolIfEmpty(pool)) 
00950       break;
00951 
00952     /*
00953      * Wait for the tables that are in use to be
00954      * freed.
00955      */
00956     gPoolListByID->wait();
00957   }
00958   unlock_(gPoolListByID);
00959   exit_();
00960 }
00961 
00962 /*
00963  * Close the pool associated with this open table.
00964  */
00965 void MSTableList::removeTablePool(MSOpenTable *otab)
00966 {
00967   MSOpenTablePool *pool;
00968   MSTableKey  key;
00969   
00970   key.myKeyDatabaseID = otab->getDB()->myDatabaseID;
00971   key.myKeyTableID = otab->getTableID();
00972 
00973   enter_();
00974   frompool_(otab);
00975   lock_(gPoolListByID);
00976   for (;;) {
00977     if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key)))
00978       break;
00979     pool->isRemovingTP = true;
00980     pool->removeOpenTablesNotInUse();
00981     if (removeTablePoolIfEmpty(pool))
00982       break;
00983     /*
00984      * Wait for the tables that are in use to be
00985      * freed.
00986      */
00987     gPoolListByID->wait();
00988   }
00989   unlock_(gPoolListByID);
00990   backtopool_(otab);
00991   exit_();
00992 }
00993 
00994 void MSTableList::removeDatabaseTables(MSDatabase *database)
00995 {
00996   MSOpenTablePool *pool;
00997   uint32_t      idx;
00998   
00999 
01000   enter_();
01001   push_(database);
01002   
01003   retry:
01004   lock_(gPoolListByID);
01005   idx = 0;
01006   while ((pool = (MSOpenTablePool *) gPoolListByID->itemAt(idx))) {
01007     if (pool->myPoolDB == database) {
01008       break;
01009     }
01010     idx++;
01011   }
01012   unlock_(gPoolListByID);
01013 
01014   if (pool) {
01015     removeTablePool(pool);
01016     goto retry;
01017   }
01018   
01019   release_(database);
01020   exit_();
01021 }
01022 
01023 // lockTablePoolForDeletion() is only called to lock a pool for a table which is about  to be removed.
01024 // When the pool is returned then it will be removed from the global pool list.
01025 MSOpenTablePool *MSTableList::lockTablePoolForDeletion(uint32_t db_id, uint32_t tab_id, CSString *db_name, CSString *tab_name)
01026 {
01027   MSOpenTablePool *pool;
01028   MSTableKey    key;
01029 
01030   enter_();
01031 
01032   push_(db_name);
01033   if (tab_name)
01034     push_(tab_name);
01035     
01036   key.myKeyDatabaseID = db_id;
01037   key.myKeyTableID = tab_id;
01038   
01039   lock_(gPoolListByID);
01040 
01041   for (;;) {
01042     if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key))) {
01043       char buffer[CS_EXC_MESSAGE_SIZE];
01044 
01045       cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Table is temporarily not available: ");
01046       cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
01047       if(tab_name) {
01048         cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ".");
01049         cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, tab_name->getCString());
01050       }
01051       CSException::throwException(CS_CONTEXT, MS_ERR_TABLE_LOCKED, buffer);
01052     }
01053     pool->isRemovingTP = true;
01054     pool->removeOpenTablesNotInUse();
01055     if (pool->getSize() == 0) {
01056       // pool->retain();  Do not do this. The return to pool will free this by removing it from the list. 
01057       break;
01058     }
01059     /*
01060      * Wait for the tables that are in use to be
01061      * freed.
01062      */
01063     gPoolListByID->wait();
01064   }
01065   unlock_(gPoolListByID);
01066   
01067   if (tab_name)
01068     release_(tab_name);
01069   release_(db_name);
01070   return_(pool);  
01071   
01072 }
01073 
01074 MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSTable *tab)
01075 {
01076   CSString *tab_name = NULL, *db_name;
01077   uint32_t db_id, tab_id;
01078   
01079   enter_();
01080 
01081   db_name = tab->myDatabase->myDatabaseName;
01082   db_name->retain();
01083 
01084   tab_name = tab->myTableName;
01085   tab_name->retain();
01086   
01087   db_id = tab->myDatabase->myDatabaseID;
01088   tab_id = tab->myTableID;
01089   
01090   tab->release();
01091   
01092   return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
01093 }
01094 
01095 MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSOpenTable *otab)
01096 {
01097   CSString *tab_name = NULL, *db_name;
01098   uint32_t db_id, tab_id;
01099   MSTable *tab;
01100 
01101   enter_();
01102   
01103   tab = otab->getDBTable();
01104   if (tab) {
01105     tab_name = tab->myTableName;
01106     tab_name->retain();
01107   }
01108   
01109   db_name = otab->getDB()->myDatabaseName;
01110   db_name->retain();
01111 
01112   db_id = otab->getDB()->myDatabaseID;
01113   tab_id = otab->getTableID();
01114 
01115   otab->returnToPool();
01116 
01117   return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
01118   
01119 }
01120 
01121