Drizzled Public API Documentation

systab_cloud_ms.cc
00001 /* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Barry Leslie
00020  *
00021  * 2009-10-21
00022  *
00023  * System cloud starage info table.
00024  *
00025  */
00026 #ifdef DRIZZLED
00027 #include <config.h>
00028 #include <drizzled/common.h>
00029 #include <drizzled/session.h>
00030 #endif
00031 
00032 #include "cslib/CSConfig.h"
00033 #include <inttypes.h>
00034 
00035 #include <sys/types.h>
00036 #include <sys/stat.h>
00037 #include <stdlib.h>
00038 #include <time.h>
00039 
00040 //#include "mysql_priv.h"
00041 #include "cslib/CSGlobal.h"
00042 #include "cslib/CSStrUtil.h"
00043 #include "cslib/CSLog.h"
00044 #include "cslib/CSPath.h"
00045 #include "cslib/CSDirectory.h"
00046 
00047 #include "ha_pbms.h"
00048 //#include <plugin.h>
00049 
00050 #include "mysql_ms.h"
00051 #include "database_ms.h"
00052 #include "open_table_ms.h"
00053 #include "discover_ms.h"
00054 #include "systab_util_ms.h"
00055 
00056 #include "systab_cloud_ms.h"
00057 
00058 DT_FIELD_INFO pbms_cloud_info[]=
00059 {
00060   {"Id",      NOVAL,  NULL, MYSQL_TYPE_LONG,    NULL,     NOT_NULL_FLAG,  "The Cloud storage reference ID"},
00061   {"Server",    1024, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 server name"},
00062   {"Bucket",    124,  NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 bucket name"},
00063   {"PublicKey", 124,  NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 public key"},
00064   {"PrivateKey",  124,  NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET,  NOT_NULL_FLAG,  "S3 private key"},
00065   {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
00066 };
00067 
00068 DT_KEY_INFO pbms_cloud_keys[]=
00069 {
00070   {"pbms_cloud_pk", PRI_KEY_FLAG, {"Id", NULL}},
00071   {NULL, 0, {NULL}}
00072 };
00073 
00074 #define MIN_CLOUD_TABLE_SIZE 4
00075 
00076 //----------------------------
00077 void MSCloudTable::startUp()
00078 {
00079   MSCloudInfo::startUp();
00080 }
00081 
00082 //----------------------------
00083 void MSCloudTable::shutDown()
00084 {
00085   MSCloudInfo::shutDown();
00086 }
00087 
00088 //----------------------------
00089 void MSCloudTable::loadTable(MSDatabase *db)
00090 {
00091 
00092   enter_();
00093   
00094   push_(db);
00095   lock_(MSCloudInfo::gCloudInfo);
00096   
00097   if (MSCloudInfo::gMaxInfoRef == 0) {
00098     CSPath  *path;
00099     path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00100     push_(path);
00101 
00102     if (path->exists()) {
00103       CSFile    *file;
00104       SysTabRec *cloudData;
00105       const char  *server, *bucket, *pubKey, *privKey;
00106       uint32_t    info_id;
00107       MSCloudInfo *info;
00108       size_t    size;
00109       
00110       new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
00111       push_(cloudData);
00112 
00113       file = path->openFile(CSFile::READONLY);
00114       push_(file);
00115       size = file->getEOF();
00116       cloudData->setLength(size);
00117       file->read(cloudData->getBuffer(0), 0, size, size);
00118       release_(file);
00119       
00120       cloudData->firstRecord();
00121       MSCloudInfo::gMaxInfoRef = cloudData->getInt4Field();
00122       
00123       if (! cloudData->isValidRecord()) 
00124         MSCloudInfo::gMaxInfoRef = 1;
00125       
00126       while (cloudData->nextRecord()) {
00127         info_id = cloudData->getInt4Field();
00128         server = cloudData->getStringField();
00129         bucket = cloudData->getStringField();
00130         pubKey = cloudData->getStringField();
00131         privKey = cloudData->getStringField();
00132         
00133         if (cloudData->isValidRecord()) {
00134           if (info_id > MSCloudInfo::gMaxInfoRef) {
00135             char msg[80];
00136             snprintf(msg, 80, "Cloud info id (%"PRIu32") larger than expected (%"PRIu32")\n", info_id, MSCloudInfo::gMaxInfoRef);
00137             CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
00138             CSL.log(self, CSLog::Warning, msg);
00139             MSCloudInfo::gMaxInfoRef = info_id +1;
00140           }
00141           if ( MSCloudInfo::gCloudInfo->get(info_id)) {
00142             char msg[80];
00143             snprintf(msg, 80, "Duplicate Cloud info id (%"PRIu32") being ignored\n", info_id);
00144             CSL.log(self, CSLog::Warning, "pbms "CLOUD_TABLE_NAME".dat :possible damaged file or record. ");
00145             CSL.log(self, CSLog::Warning, msg);
00146           } else {
00147             new_(info, MSCloudInfo( info_id, server, bucket, pubKey, privKey));
00148             MSCloudInfo::gCloudInfo->set(info_id, info);
00149           }
00150         }
00151       }
00152       release_(cloudData); cloudData = NULL;
00153       
00154     } else
00155       MSCloudInfo::gMaxInfoRef = 1;
00156     
00157     release_(path);
00158     
00159   }
00160   unlock_(MSCloudInfo::gCloudInfo);
00161 
00162   release_(db);
00163 
00164   exit_();
00165 }
00166 
00167 void MSCloudTable::saveTable(MSDatabase *db)
00168 {
00169   SysTabRec   *cloudData;
00170   MSCloudInfo   *info;
00171   enter_();
00172   
00173   push_(db);
00174   
00175   new_(cloudData, SysTabRec("pbms", CLOUD_TABLE_NAME".dat", CLOUD_TABLE_NAME));
00176   push_(cloudData);
00177   
00178   // Build the table records
00179   cloudData->clear();
00180   lock_(MSCloudInfo::gCloudInfo);
00181   
00182   cloudData->beginRecord(); 
00183   cloudData->setInt4Field(MSCloudInfo::gMaxInfoRef);
00184   cloudData->endRecord(); 
00185   for  (int i = 0;(info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->itemAt(i)); i++) { // info is not referenced.
00186     
00187     cloudData->beginRecord(); 
00188     cloudData->setInt4Field(info->getCloudRefId());
00189     cloudData->setStringField(info->getServer());
00190     cloudData->setStringField(info->getBucket());
00191     cloudData->setStringField(info->getPublicKey());
00192     cloudData->setStringField(info->getPrivateKey());
00193     cloudData->endRecord();     
00194   }
00195   unlock_(MSCloudInfo::gCloudInfo);
00196 
00197   restoreTable(RETAIN(db), cloudData->getBuffer(0), cloudData->length(), false);
00198   
00199   release_(cloudData);
00200   release_(db);
00201   exit_();
00202 }
00203 
00204 
00205 MSCloudTable::MSCloudTable(MSSystemTableShare *share, TABLE *table):
00206 MSOpenSystemTable(share, table),
00207 iCloudIndex(0)
00208 {
00209 }
00210 
00211 MSCloudTable::~MSCloudTable()
00212 {
00213   //unuse();
00214 }
00215 
00216 void MSCloudTable::use()
00217 {
00218   MSCloudInfo::gCloudInfo->lock();
00219 }
00220 
00221 void MSCloudTable::unuse()
00222 {
00223   MSCloudInfo::gCloudInfo->unlock();
00224   
00225 }
00226 
00227 
00228 void MSCloudTable::seqScanInit()
00229 {
00230   iCloudIndex = 0;
00231 }
00232 
00233 #define MAX_PASSWORD ((int32_t)64)
00234 bool MSCloudTable::seqScanNext(char *buf)
00235 {
00236   char    passwd[MAX_PASSWORD +1];
00237   TABLE   *table = mySQLTable;
00238   Field   *curr_field;
00239   byte    *save;
00240   MY_BITMAP *save_write_set;
00241   MSCloudInfo *info;
00242   const char  *val;
00243   
00244   enter_();
00245   
00246   info = (MSCloudInfo *) MSCloudInfo::gCloudInfo->itemAt(iCloudIndex++); // Object is not referenced.
00247   if (!info)
00248     return_(false);
00249   
00250   save_write_set = table->write_set;
00251   table->write_set = NULL;
00252 
00253 #ifdef DRIZZLED
00254   memset(buf, 0xFF, table->getNullBytes());
00255 #else
00256   memset(buf, 0xFF, table->s->null_bytes);
00257 #endif
00258   for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
00259     curr_field = *field;
00260     save = curr_field->ptr;
00261 #if MYSQL_VERSION_ID < 50114
00262     curr_field->ptr = (byte *) buf + curr_field->offset();
00263 #else
00264 #ifdef DRIZZLED
00265     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
00266 #else
00267     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
00268 #endif
00269 #endif
00270     switch (curr_field->field_name[0]) {
00271       case 'I':
00272         ASSERT(strcmp(curr_field->field_name, "Id") == 0);
00273         curr_field->store(info->getCloudRefId(), true);
00274         break;
00275 
00276       case 'S':
00277         ASSERT(strcmp(curr_field->field_name, "Server") == 0);
00278         val = info->getServer();
00279         curr_field->store(val, strlen(val), &UTF8_CHARSET);
00280         setNotNullInRecord(curr_field, buf);
00281         break;
00282 
00283       case 'B': 
00284         ASSERT(strcmp(curr_field->field_name, "Bucket") == 0);
00285         val = info->getBucket();
00286         curr_field->store(val, strlen(val), &UTF8_CHARSET);
00287         setNotNullInRecord(curr_field, buf);
00288         break;
00289 
00290       case 'P': 
00291         if (curr_field->field_name[1] == 'u') {
00292           ASSERT(strcmp(curr_field->field_name, "PublicKey") == 0);
00293           val = info->getPublicKey();
00294         } else if (curr_field->field_name[1] == 'r') {
00295           ASSERT(strcmp(curr_field->field_name, "PrivateKey") == 0);
00296           val = info->getPrivateKey();
00297           
00298           int32_t i;
00299           for (i = 0; (i < MAX_PASSWORD) && (i < (int32_t)strlen(val)); i++) passwd[i] = '*';
00300           passwd[i] = 0;
00301           val = passwd;
00302         } else {
00303           ASSERT(false);
00304           break;
00305         }
00306         curr_field->store(val, strlen(val), &UTF8_CHARSET);
00307         setNotNullInRecord(curr_field, buf);
00308         break;
00309         
00310       default:
00311         ASSERT(false);
00312     }
00313     curr_field->ptr = save;
00314   }
00315 
00316   table->write_set = save_write_set;
00317   
00318   return_(true);
00319 }
00320 
00321 void MSCloudTable::seqScanPos(unsigned char *pos )
00322 {
00323   int32_t index = iCloudIndex -1;
00324   if (index < 0)
00325     index = 0; // This is probably an error condition.
00326     
00327   mi_int4store(pos, index);
00328 }
00329 
00330 void MSCloudTable::seqScanRead(unsigned char *pos , char *buf)
00331 {
00332   iCloudIndex = mi_uint4korr(pos);
00333   seqScanNext(buf);
00334 }
00335 
00336 void MSCloudTable::updateRow(char *old_data, char *new_data) 
00337 {
00338   uint32_t n_id, o_id, o_indx, n_indx;
00339   const char *realPrivKey;
00340   String server, bucket, pubKey, privKey;
00341   String o_server, o_bucket, o_pubKey, o_privKey;
00342   MSCloudInfo *info;
00343 
00344   enter_();
00345   
00346   getFieldValue(new_data, 0, &n_id);
00347   getFieldValue(new_data, 1, &server);
00348   getFieldValue(new_data, 2, &bucket);
00349   getFieldValue(new_data, 3, &pubKey);
00350   getFieldValue(new_data, 4, &privKey);
00351 
00352   getFieldValue(old_data, 0, &o_id);
00353   getFieldValue(old_data, 1, &o_server);
00354   getFieldValue(old_data, 2, &o_bucket);
00355   getFieldValue(old_data, 3, &o_pubKey);
00356   getFieldValue(old_data, 4, &o_privKey);
00357 
00358   // The cloud ID must be unique
00359   if ((o_id !=  n_id) && MSCloudInfo::gCloudInfo->get(n_id)) {
00360     CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to update a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
00361   }
00362 
00363   // The private key is masked when returned to the caller, so
00364   // unless the caller has updated it we need to get the real 
00365   // private key from the old record.
00366   if (strcmp(privKey.c_ptr(), o_privKey.c_ptr()))
00367     realPrivKey = privKey.c_ptr();
00368   else {
00369     info = (MSCloudInfo*) MSCloudInfo::gCloudInfo->get(o_id); // unreference pointer
00370     realPrivKey = info->getPrivateKey();
00371   }
00372   
00373   new_(info, MSCloudInfo( n_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), realPrivKey));
00374   push_(info);
00375   
00376   o_indx = MSCloudInfo::gCloudInfo->getIndex(o_id);
00377 
00378   MSCloudInfo::gCloudInfo->remove(o_id);
00379   pop_(info);
00380   MSCloudInfo::gCloudInfo->set(n_id, info);
00381   n_indx = MSCloudInfo::gCloudInfo->getIndex(n_id);
00382   
00383   // Adjust the current position in the array if required.
00384   if (o_indx < n_indx )
00385     iCloudIndex--;
00386     
00387   saveTable(RETAIN(myShare->mySysDatabase));
00388   exit_();
00389 }
00390 
00391 void MSCloudTable::insertRow(char *data) 
00392 {
00393   uint32_t ref_id;
00394   String server, bucket, pubKey, privKey;
00395   MSCloudInfo *info;
00396 
00397   enter_();
00398   
00399   getFieldValue(data, 0, &ref_id);
00400     
00401   // The cloud ID must be unique
00402   if (ref_id && MSCloudInfo::gCloudInfo->get(ref_id)) {
00403     CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Attempt to insert a row with a duplicate key in the "CLOUD_TABLE_NAME" table.");
00404   }
00405   
00406   getFieldValue(data, 1, &server);
00407   getFieldValue(data, 2, &bucket);
00408   getFieldValue(data, 3, &pubKey);
00409   getFieldValue(data, 4, &privKey);
00410 
00411   if (ref_id == 0)
00412     ref_id = MSCloudInfo::gMaxInfoRef++;
00413   else if (ref_id >= MSCloudInfo::gMaxInfoRef)
00414     MSCloudInfo::gMaxInfoRef = ref_id +1;
00415     
00416   new_(info, MSCloudInfo( ref_id, server.c_ptr(), bucket.c_ptr(), pubKey.c_ptr(), privKey.c_ptr()));
00417   MSCloudInfo::gCloudInfo->set(ref_id, info);
00418   
00419   saveTable(RETAIN(myShare->mySysDatabase));
00420   exit_();
00421 }
00422 
00423 void MSCloudTable::deleteRow(char *data) 
00424 {
00425   uint32_t ref_id, indx;
00426 
00427   enter_();
00428   
00429   getFieldValue(data, 0, &ref_id);
00430     
00431   // Adjust the current position in the array if required.
00432   indx = MSCloudInfo::gCloudInfo->getIndex(ref_id);
00433   if (indx <= iCloudIndex)
00434     iCloudIndex--;
00435 
00436   MSCloudInfo::gCloudInfo->remove(ref_id);
00437   saveTable(RETAIN(myShare->mySysDatabase));
00438   exit_();
00439 }
00440 
00441 void MSCloudTable::transferTable(MSDatabase *to_db, MSDatabase *from_db)
00442 {
00443   CSPath  *path;
00444   enter_();
00445   
00446   push_(from_db);
00447   push_(to_db);
00448   
00449   path = CSPath::newPath(getPBMSPath(RETAIN(from_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
00450   push_(path);
00451   if (path->exists()) {
00452     CSPath  *bu_path;
00453     bu_path = CSPath::newPath(getPBMSPath(RETAIN(to_db->myDatabasePath)), CLOUD_TABLE_NAME".dat");
00454     path->copyTo(bu_path, true);
00455   }
00456   
00457   release_(path);
00458   release_(to_db);
00459   release_(from_db);
00460   
00461   exit_();
00462 }
00463 
00464 CSStringBuffer *MSCloudTable::dumpTable(MSDatabase *db)
00465 {
00466 
00467   CSPath      *path;
00468   CSStringBuffer  *dump;
00469 
00470   enter_();
00471   
00472   push_(db);
00473   path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00474   release_(db);
00475   
00476   push_(path);
00477   new_(dump, CSStringBuffer(20));
00478   push_(dump);
00479 
00480   if (path->exists()) {
00481     CSFile  *file;
00482     size_t  size;
00483     
00484     file = path->openFile(CSFile::READONLY);
00485     push_(file);
00486     
00487     size = file->getEOF();
00488     dump->setLength(size);
00489     file->read(dump->getBuffer(0), 0, size, size);
00490     release_(file);
00491   }
00492   
00493   pop_(dump);
00494   release_(path);
00495   return_(dump);
00496 }
00497 
00498 void MSCloudTable::restoreTable(MSDatabase *db, const char *data, size_t size, bool reload)
00499 {
00500   CSPath  *path;
00501   CSFile  *file;
00502 
00503   enter_();
00504   
00505   push_(db);
00506   path = getSysFile(getPBMSPath(RETAIN(db->myDatabasePath)), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00507   push_(path);
00508   
00509   file = path->openFile(CSFile::CREATE | CSFile::TRUNCATE);
00510   push_(file);
00511   
00512   file->write(data, 0, size);
00513   file->close();
00514   release_(file);
00515   
00516   release_(path);
00517   
00518   pop_(db);
00519   if (reload)
00520     loadTable(db);
00521   else
00522     db->release();
00523     
00524   exit_();
00525 }
00526 
00527 void MSCloudTable::removeTable(CSString *db_path)
00528 {
00529   CSPath  *path;
00530   char pbms_path[PATH_MAX];
00531   
00532   enter_();
00533   
00534   push_(db_path); 
00535   cs_strcpy(PATH_MAX, pbms_path, db_path->getCString());
00536   release_(db_path);
00537   
00538   if (strcmp(cs_last_name_of_path(pbms_path), "pbms")  != 0)
00539     exit_();
00540     
00541   cs_remove_last_name_of_path(pbms_path);
00542 
00543   path = getSysFile(CSString::newString(pbms_path), CLOUD_TABLE_NAME, MIN_CLOUD_TABLE_SIZE);
00544   push_(path);
00545   
00546   if (path->exists())
00547     path->removeFile();
00548   release_(path);
00549   
00550   exit_();
00551 }
00552