Drizzled Public API Documentation

system_table_ms.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-07-18
00023  *
00024  * H&G2JCtL
00025  *
00026  * System tables.
00027  *
00028  */
00029 
00030 #ifdef DRIZZLED
00031 #include <config.h>
00032 #include <drizzled/common.h>
00033 #include <drizzled/session.h>
00034 #include <drizzled/table.h>
00035 #include <drizzled/field.h>
00036 #include <drizzled/field/blob.h>
00037 
00038 #include <drizzled/message/table.pb.h>
00039 #include <drizzled/charset_info.h>
00040 #include <drizzled/table_proto.h>
00041 #endif
00042 
00043 
00044 #include "cslib/CSConfig.h"
00045 #include <inttypes.h>
00046 
00047 #include <sys/types.h>
00048 #include <sys/stat.h>
00049 #include <stdlib.h>
00050 #include <time.h>
00051 
00052 //#include "mysql_priv.h"
00053 //#include <plugin.h>
00054 
00055 #include "cslib/CSGlobal.h"
00056 #include "cslib/CSStrUtil.h"
00057 #include "ha_pbms.h"
00058 
00059 #include "mysql_ms.h"
00060 #include "engine_ms.h"
00061 #include "system_table_ms.h"
00062 #include "repository_ms.h"
00063 #include "database_ms.h"
00064 #include "compactor_ms.h"
00065 #include "open_table_ms.h"
00066 #include "metadata_ms.h"
00067 #ifdef HAVE_ALIAS_SUPPORT
00068 #include "alias_ms.h"
00069 #endif
00070 #include "cloud_ms.h"
00071 #include "transaction_ms.h"
00072 
00073 #include "systab_httpheader_ms.h"
00074 #include "systab_dump_ms.h"
00075 #include "systab_variable_ms.h"
00076 #include "systab_cloud_ms.h"
00077 #include "systab_backup_ms.h"
00078 #ifndef DRIZZLED
00079 #include "systab_enabled_ms.h"
00080 #endif
00081 #include "discover_ms.h"
00082 #include "parameters_ms.h"
00083 
00085 //#ifdef new
00086 //#undef new
00087 //#endif
00088 
00089 /* Definitions for PBMS table discovery: */
00090 //--------------------------------
00091 static DT_FIELD_INFO pbms_repository_info[]=
00092 {
00093 #ifdef HAVE_ALIAS_SUPPORT
00094   {"Blob_alias",      BLOB_ALIAS_LENGTH, NULL, MYSQL_TYPE_VARCHAR,    &my_charset_utf8_bin, 0,                "The BLOB alias"},
00095 #endif
00096   {"Repository_id",   NOVAL, NULL, MYSQL_TYPE_LONG,   NULL,         NOT_NULL_FLAG,          "The repository file number"},
00097   {"Repo_blob_offset",  NOVAL, NULL, MYSQL_TYPE_LONGLONG, NULL,         NOT_NULL_FLAG,          "The offset of the BLOB in the repository file"},
00098   {"Blob_size",     NOVAL, NULL, MYSQL_TYPE_LONGLONG, NULL,         NOT_NULL_FLAG,          "The size of the BLOB in bytes"},
00099   {"MD5_Checksum",    32,   NULL, MYSQL_TYPE_VARCHAR,   system_charset_info,  0,                "The MD5 Digest of the BLOB data."},
00100   {"Head_size",     NOVAL, NULL, MYSQL_TYPE_SHORT,    NULL,         NOT_NULL_FLAG | UNSIGNED_FLAG,  "The size of the BLOB header - proceeds the BLOB data"},
00101   {"Access_code",     NOVAL, NULL, MYSQL_TYPE_LONG,   NULL,         NOT_NULL_FLAG,          "The 4-byte authorisation code required to access the BLOB - part of the BLOB URL"},
00102   {"Creation_time",   NOVAL, NULL, MYSQL_TYPE_TIMESTAMP,  NULL,         NOT_NULL_FLAG,          "The time the BLOB was created"},
00103   {"Last_ref_time",   NOVAL, NULL, MYSQL_TYPE_TIMESTAMP,  NULL,         0,                "The last time the BLOB was referenced"},
00104   {"Last_access_time",  NOVAL, NULL, MYSQL_TYPE_TIMESTAMP,  NULL,         0,                "The last time the BLOB was accessed (read)"},
00105   {"Access_count",    NOVAL, NULL, MYSQL_TYPE_LONG,   NULL,         NOT_NULL_FLAG,          "The count of the number of times the BLOB has been read"},
00106   {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
00107 };
00108 
00109 #ifdef PBMS_HAS_KEYS
00110 static DT_KEY_INFO pbms_repository_keys[]=
00111 {
00112   {"pbms_repository_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
00113   {NULL, 0, {NULL}}
00114 };
00115 #endif
00116 
00117 static DT_FIELD_INFO pbms_metadata_info[]=
00118 {
00119   {"Repository_id",   NOVAL,          NULL, MYSQL_TYPE_LONG,    NULL,             NOT_NULL_FLAG,  "The repository file number"},
00120   {"Repo_blob_offset",  NOVAL,          NULL, MYSQL_TYPE_LONGLONG,  NULL,             NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
00121   {"Name",        MS_META_NAME_SIZE,  NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET,  NOT_NULL_FLAG,  "Metadata name"},
00122   {"Value",       MS_META_VALUE_SIZE, NULL, MYSQL_TYPE_VARCHAR, &UTF8_CHARSET,      NOT_NULL_FLAG,  "Metadata value"},
00123   {NULL,          NOVAL,          NULL, MYSQL_TYPE_STRING,  NULL, 0, NULL}
00124 };
00125 
00126 #ifdef PBMS_HAS_KEYS
00127 static DT_KEY_INFO pbms_metadata_keys[]=
00128 {
00129   {"pbms_metadata_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
00130   {NULL, 0, {NULL}}
00131 };
00132 #endif
00133 
00134 
00135 #ifdef HAVE_ALIAS_SUPPORT
00136 static DT_FIELD_INFO pbms_alias_info[]=
00137 {
00138   {"Repository_id",   NOVAL, NULL, MYSQL_TYPE_LONG,   NULL,       NOT_NULL_FLAG,  "The repository file number"},
00139   {"Repo_blob_offset",  NOVAL, NULL, MYSQL_TYPE_LONGLONG, NULL,       NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
00140   {"Blob_alias",      BLOB_ALIAS_LENGTH, NULL, MYSQL_TYPE_VARCHAR,    &my_charset_utf8_bin, NOT_NULL_FLAG,      "The BLOB alias"},
00141   {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
00142 };
00143 
00144 static DT_KEY_INFO pbms_alias_keys[]=
00145 {
00146   {"pbms_alias_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
00147   {NULL, 0, {NULL}}
00148 };
00149 #endif
00150 
00151 static DT_FIELD_INFO pbms_blobs_info[]=
00152 {
00153   {"Repository_id",   NOVAL, NULL, MYSQL_TYPE_LONG,   NULL,       NOT_NULL_FLAG,  "The repository file number"},
00154   {"Repo_blob_offset",  NOVAL, NULL, MYSQL_TYPE_LONGLONG, NULL,       NOT_NULL_FLAG,  "The offset of the BLOB in the repository file"},
00155   {"Blob_data",     NOVAL, NULL, MYSQL_TYPE_LONG_BLOB,  &my_charset_bin,  NOT_NULL_FLAG,  "The data of this BLOB"},
00156   {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
00157 };
00158 
00159 #ifdef PBMS_HAS_KEYS
00160 static DT_KEY_INFO pbms_blobs_keys[]=
00161 {
00162   {"pbms_blobs_pk", PRI_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
00163   {NULL, 0, {NULL}}
00164 };
00165 #endif
00166 
00167 static DT_FIELD_INFO pbms_reference_info[]=
00168 {
00169   {"Table_name",    MS_TABLE_NAME_SIZE,   NULL, MYSQL_TYPE_STRING,  system_charset_info,  0,  "The name of the referencing table"},
00170   {"Column_ordinal",  NOVAL,          NULL, MYSQL_TYPE_LONG,    NULL,         0,  "The column ordinal of the referencing field"},
00171   {"Blob_id",     NOVAL,          NULL, MYSQL_TYPE_LONGLONG,  NULL,         NOT_NULL_FLAG,  "The BLOB reference number - part of the BLOB URL"},
00172   {"Blob_url",    PBMS_BLOB_URL_SIZE,   NULL, MYSQL_TYPE_VARCHAR, system_charset_info,  0,  "The BLOB URL for HTTP GET access"},
00173   {"Repository_id", NOVAL,          NULL, MYSQL_TYPE_LONG,    NULL,         NOT_NULL_FLAG,  "The repository file number of the BLOB"},
00174   {"Repo_blob_offset",NOVAL,          NULL, MYSQL_TYPE_LONGLONG,  NULL,         NOT_NULL_FLAG,  "The offset in the repository file"},
00175   {"Blob_size",   NOVAL,          NULL, MYSQL_TYPE_LONGLONG,  NULL,         NOT_NULL_FLAG,  "The size of the BLOB in bytes"},
00176   {"Deletion_time", NOVAL,          NULL, MYSQL_TYPE_TIMESTAMP, NULL,         0,        "The time the BLOB was deleted"},
00177   {"Remove_in",   NOVAL,          NULL, MYSQL_TYPE_LONG,    NULL,         0,        "The number of seconds before the reference/BLOB is removed perminently"},
00178   {"Temp_log_id",   NOVAL,          NULL, MYSQL_TYPE_LONG,    NULL,         0,        "Temporary log number of the referencing deletion entry"},
00179   {"Temp_log_offset", NOVAL,          NULL, MYSQL_TYPE_LONGLONG,  NULL,         0,        "Temporary log offset of the referencing deletion entry"},
00180   {NULL,NOVAL, NULL, MYSQL_TYPE_STRING,NULL, 0, NULL}
00181 };
00182 
00183 #ifdef PBMS_HAS_KEYS
00184 static DT_KEY_INFO pbms_reference_keys[]=
00185 {
00186   {"pbms_reference_pk", PRI_KEY_FLAG, {"Table_name", "Blob_id", NULL}},
00187   {"pbms_reference_k", MULTIPLE_KEY_FLAG, {"Repository_id", "Repo_blob_offset", NULL}},
00188   {NULL, 0, {NULL}}
00189 };
00190 #endif
00191 
00192 
00193 typedef enum {  SYS_REP = 0, 
00194         SYS_REF, 
00195         SYS_BLOB, 
00196         SYS_DUMP, 
00197         SYS_META, 
00198         SYS_HTTP, 
00199 #ifdef HAVE_ALIAS_SUPPORT
00200         SYS_ALIAS, 
00201 #endif
00202         SYS_VARIABLE, 
00203         SYS_CLOUD, 
00204         SYS_BACKUP, 
00205 #ifndef DRIZZLED
00206         SYS_ENABLED, 
00207 #endif
00208         SYS_UNKNOWN} SysTableType;
00209         
00210 static const char *sysTableNames[] = {
00211   "pbms_repository",
00212   "pbms_reference",
00213   "pbms_blob",
00214   "pbms_dump",
00215   "pbms_metadata",
00216   METADATA_HEADER_NAME,
00217 #ifdef HAVE_ALIAS_SUPPORT
00218   "pbms_alias",
00219 #endif
00220   VARIABLES_TABLE_NAME,
00221   CLOUD_TABLE_NAME,
00222   BACKUP_TABLE_NAME,
00223 #ifndef DRIZZLED
00224   ENABLED_TABLE_NAME,
00225 #endif
00226   NULL
00227 };
00228 
00229 static INTERRNAL_TABLE_INFO pbms_internal_tables[]=
00230 {
00231 #ifdef PBMS_HAS_KEYS
00232   { false, sysTableNames[SYS_REP],pbms_repository_info, pbms_repository_keys},
00233   { false, sysTableNames[SYS_REF], pbms_reference_info, pbms_reference_keys},
00234   { false, sysTableNames[SYS_BLOB], pbms_blobs_info, pbms_blobs_keys},
00235   { false, sysTableNames[SYS_DUMP], pbms_dump_info, pbms_dump_keys},
00236   { false, sysTableNames[SYS_META], pbms_metadata_info, pbms_metadata_keys},
00237   { false, sysTableNames[SYS_HTTP], pbms_metadata_headers_info, pbms_metadata_headers_keys},
00238 #ifdef HAVE_ALIAS_SUPPORT
00239   { false, sysTableNames[SYS_ALIAS], pbms_alias_info, pbms_alias_keys},
00240 #endif
00241   { false, sysTableNames[SYS_VARIABLE], pbms_variable_info, pbms_variable_keys},
00242   { true, sysTableNames[SYS_CLOUD], pbms_cloud_info, pbms_cloud_keys},
00243   { true, sysTableNames[SYS_BACKUP], pbms_backup_info, pbms_backup_keys},
00244 #ifndef DRIZZLED
00245   { true, sysTableNames[SYS_ENABLED], pbms_enabled_info, pbms_enabled_keys},
00246 #endif
00247 #else
00248   { false, sysTableNames[SYS_REP], pbms_repository_info, NULL},
00249   { false, sysTableNames[SYS_REF], pbms_reference_info, NULL},
00250   { false, sysTableNames[SYS_BLOB], pbms_blobs_info, NULL},
00251   { false, sysTableNames[SYS_DUMP], pbms_dump_info, NULL},
00252   { false, sysTableNames[SYS_META], pbms_metadata_info, NULL},
00253   { false, sysTableNames[SYS_HTTP], pbms_metadata_headers_info, NULL},
00254 #ifdef HAVE_ALIAS_SUPPORT
00255   { false, sysTableNames[SYS_ALIAS], pbms_alias_info, NULL},
00256 #endif
00257   { false, sysTableNames[SYS_VARIABLE], pbms_variable_info, NULL},
00258   { true, sysTableNames[SYS_CLOUD], pbms_cloud_info, NULL},
00259   { true, sysTableNames[SYS_BACKUP], pbms_backup_info, NULL},
00260 #ifndef DRIZZLED
00261   { true, sysTableNames[SYS_ENABLED], pbms_enabled_info, NULL},
00262 #endif
00263 #endif
00264 
00265   { false, NULL, NULL, NULL}
00266   
00267 };
00268 
00269 //--------------------------
00270 static SysTableType pbms_systable_type(const char *table)
00271 {
00272   int i = 0;
00273   
00274   while ((i < SYS_UNKNOWN) && strcasecmp(table, sysTableNames[i])) i++;
00275   
00276   return((SysTableType) i );
00277 }
00278 
00279 //--------------------------
00280 bool PBMSSystemTables::isSystemTable(bool isPBMS, const char *table)
00281 {
00282   SysTableType i;
00283   
00284   i = pbms_systable_type(table);
00285 
00286   if (i == SYS_UNKNOWN)
00287     return false;
00288     
00289   return (pbms_internal_tables[i].is_pbms == isPBMS);
00290 }
00291 
00292 //--------------------------
00293 #ifdef DRIZZLED
00294 using namespace std;
00295 using namespace drizzled;
00296 #undef TABLE
00297 #undef Field
00298 static int pbms_create_proto_table(const char *engine_name, const char *name, DT_FIELD_INFO *info, DT_KEY_INFO *keys, drizzled::message::Table &table)
00299 {
00300   message::Table::Field *field;
00301   message::Table::Field::FieldConstraints *field_constraints;
00302   message::Table::Field::StringFieldOptions *string_field_options;
00303   message::Table::TableOptions *table_options;
00304 
00305   table.set_name(name);
00306   table.set_name(name);
00307   table.set_type(message::Table::STANDARD);
00308   table.mutable_engine()->set_name(engine_name);
00309   
00310   table_options = table.mutable_options();
00311   table_options->set_collation_id(my_charset_utf8_bin.number);
00312   table_options->set_collation(my_charset_utf8_bin.name);
00313   
00314   while (info->field_name) {  
00315     field= table.add_field();
00316     
00317     field->set_name(info->field_name);
00318     if (info->comment)
00319       field->set_comment(info->comment);
00320       
00321     field_constraints= field->mutable_constraints();
00322     if (info->field_flags & NOT_NULL_FLAG)
00323       field_constraints->set_is_notnull(true);
00324     
00325     if (info->field_flags & UNSIGNED_FLAG)
00326       field_constraints->set_is_unsigned(true);
00327     else
00328       field_constraints->set_is_unsigned(false);
00329 
00330     switch (info->field_type) {
00331       case DRIZZLE_TYPE_VARCHAR:
00332         string_field_options = field->mutable_string_options();
00333         
00334         field->set_type(message::Table::Field::VARCHAR);
00335         string_field_options->set_length(info->field_length);
00336         if (info->field_charset) {
00337           string_field_options->set_collation(info->field_charset->name);
00338           string_field_options->set_collation_id(info->field_charset->number);
00339         }
00340         break;
00341         
00342       case DRIZZLE_TYPE_LONG:
00343         field->set_type(message::Table::Field::INTEGER);
00344         break;
00345         
00346       case DRIZZLE_TYPE_DOUBLE:
00347         field->set_type(message::Table::Field::DOUBLE);
00348         break;
00349         
00350       case DRIZZLE_TYPE_LONGLONG:
00351         field->set_type(message::Table::Field::BIGINT);
00352         break;
00353         
00354       case DRIZZLE_TYPE_TIMESTAMP:
00355         field->set_type(message::Table::Field::EPOCH);
00356         break;
00357         
00358       case DRIZZLE_TYPE_BLOB:
00359         field->set_type(message::Table::Field::BLOB);
00360         if (info->field_charset) {
00361           string_field_options = field->mutable_string_options();
00362           string_field_options->set_collation(info->field_charset->name);
00363           string_field_options->set_collation_id(info->field_charset->number);
00364         }
00365         break;
00366         
00367       default:
00368         assert(0); 
00369     }
00370     info++;
00371   }
00372   
00373       
00374   if (keys) {
00375     while (keys->key_name) {
00376       // To be done later. (maybe)
00377       keys++;
00378     }
00379   }
00380 
00381   return 0;
00382 }
00383 #define TABLE               drizzled::Table
00384 #define Field               drizzled::Field
00385 
00386 int PBMSSystemTables::getSystemTableInfo(const char *name, drizzled::message::Table &table)
00387 {
00388   int err = 1, i = 0;
00389       
00390   while (pbms_internal_tables[i].name) {
00391     if (strcasecmp(name, pbms_internal_tables[i].name) == 0){
00392       err = pbms_create_proto_table("PBMS", name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, table);
00393       break;
00394     }
00395     i++;
00396   }
00397   
00398   return err;
00399 }
00400 
00401 void PBMSSystemTables::getSystemTableNames(bool isPBMS, std::set<std::string> &set_of_names)
00402 {
00403   int i = 0;
00404       
00405   while (pbms_internal_tables[i].name) {
00406     if ( isPBMS == pbms_internal_tables[i].is_pbms){
00407       set_of_names.insert(pbms_internal_tables[i].name);
00408     }
00409     i++;
00410   }
00411   
00412 }
00413 
00414 #else // DRIZZLED
00415 //--------------------------
00416 static bool pbms_database_follder_exists( const char *db)
00417 {
00418   struct stat stat_info;  
00419   char path[PATH_MAX];
00420   
00421   if (!db)
00422     return false;
00423     
00424   cs_strcpy(PATH_MAX, path, ms_my_get_mysql_home_path());
00425   cs_add_name_to_path(PATH_MAX, path, db);
00426   
00427   if (stat(path, &stat_info) == 0)
00428     return(stat_info.st_mode & S_IFDIR);
00429     
00430   return false;
00431 }
00432 
00433 int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen)
00434 {
00435   int err = 1, i = 0;
00436   bool is_pbms = false;
00437 
00438   // Check that the database exists!
00439   if (!pbms_database_follder_exists(db))
00440     return err;
00441     
00442   is_pbms = (strcmp(db, "pbms") == 0);
00443     
00444   
00445   while (pbms_internal_tables[i].name) {
00446     if ((!strcasecmp(name, pbms_internal_tables[i].name)) && ( is_pbms == pbms_internal_tables[i].is_pbms)){
00447       err = ms_create_table_frm(hton, thd, db, name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, frmblob, frmlen);
00448       break;
00449     }
00450     i++;
00451   }
00452   
00453   return err;
00454 }
00455 #endif // DRIZZLED
00456 
00457 // Transfer any physical PBMS ststem tables to another database.
00458 void PBMSSystemTables::transferSystemTables(MSDatabase *dst_db, MSDatabase *src_db)
00459 {
00460   enter_();
00461   push_(dst_db);
00462   push_(src_db);
00463   
00464   MSHTTPHeaderTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
00465   MSVariableTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
00466   MSCloudTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
00467   MSBackupTable::transferTable(RETAIN(dst_db), RETAIN(src_db));
00468   
00469   release_(src_db);
00470   release_(dst_db);
00471   exit_();
00472 }
00473 
00474 //----------------
00475 void PBMSSystemTables::removeSystemTables(CSString *db_path)
00476 {
00477   enter_();
00478   push_(db_path);
00479   
00480   MSHTTPHeaderTable::removeTable(RETAIN(db_path));
00481   MSVariableTable::removeTable(RETAIN(db_path));
00482   MSCloudTable::removeTable(RETAIN(db_path));
00483   MSBackupTable::removeTable(RETAIN(db_path));
00484   
00485   release_(db_path);
00486   exit_();
00487 }
00488 
00489 //----------------
00490 bool PBMSSystemTables::try_loadSystemTables(CSThread *self, int i, MSDatabase *db)
00491 {
00492   volatile bool rtc = true;
00493   try_(a) {
00494     switch (i) {
00495       case 0:
00496         MSHTTPHeaderTable::loadTable(RETAIN(db));
00497         break;
00498       case 1:
00499         MSCloudTable::loadTable(RETAIN(db));
00500         break;
00501       case 2:
00502         MSBackupTable::loadTable(RETAIN(db));
00503         break;
00504       case 3:
00505         // Variable must be loaded after cloud and backup info
00506         // incase BLOB recovery is required.
00507         MSVariableTable::loadTable(RETAIN(db)); 
00508         break;
00509         
00510       default:
00511         ASSERT(false);
00512     }
00513     rtc = false;
00514   }
00515   catch_(a);
00516   cont_(a);
00517   return rtc;
00518 }
00519 //----------------
00520 void PBMSSystemTables::loadSystemTables(MSDatabase *db)
00521 {
00522   enter_();
00523   push_(db);
00524   
00525   for ( int i = 0; i < 4; i++) {
00526     if (try_loadSystemTables(self, i, db))
00527       self->logException();     
00528   }
00529   
00530   release_(db);
00531   exit_();
00532 }
00533 
00534 //----------------
00535 // Dump all the system tables into one buffer.
00536 typedef struct {
00537   CSDiskValue4 size_4;
00538   CSDiskValue1 tab_id_1;
00539   CSDiskValue4 tab_version_4;
00540 } DumpHeaderRec, *DumpHeaderPtr;
00541 
00542 typedef union {
00543     char *rec_chars;
00544     const char *rec_cchars;
00545     DumpHeaderPtr dumpHeader;
00546 } DumpDiskData;
00547 
00548 CSStringBuffer *PBMSSystemTables::dumpSystemTables(MSDatabase *db)
00549 {
00550   CSStringBuffer *sysDump, *tabDump = NULL;
00551   uint32_t size, pos, tab_version;
00552   uint8_t tab_id = 0;
00553   DumpDiskData  d;
00554 
00555   enter_();
00556   push_(db);
00557   new_(sysDump, CSStringBuffer());
00558   push_(sysDump);
00559   pos = 0;
00560   
00561   for ( int i = 0; i < 4; i++) {
00562     switch (i) {
00563       case 0:
00564         tabDump = MSHTTPHeaderTable::dumpTable(RETAIN(db));
00565         tab_id = MSHTTPHeaderTable::tableID;
00566         tab_version = MSHTTPHeaderTable::tableVersion;
00567         break;
00568       case 1:
00569         tabDump = MSCloudTable::dumpTable(RETAIN(db));
00570         tab_id = MSCloudTable::tableID;
00571         tab_version = MSCloudTable::tableVersion;
00572         break;
00573       case 2:
00574         tabDump = MSBackupTable::dumpTable(RETAIN(db));
00575         tab_id = MSBackupTable::tableID;
00576         tab_version = MSBackupTable::tableVersion;
00577         break;
00578       case 3:
00579         tabDump = MSVariableTable::dumpTable(RETAIN(db)); // Dump the variables table last.
00580         tab_id = MSVariableTable::tableID;
00581         tab_version = MSVariableTable::tableVersion;
00582         break;
00583         
00584       default:
00585         ASSERT(false);
00586     }
00587     
00588     push_(tabDump);
00589     size = tabDump->length();
00590     
00591     // Grow the buffer for the header
00592     sysDump->setLength(pos + sizeof(DumpHeaderRec));
00593     
00594     // Add the dump header 
00595     d.rec_chars = sysDump->getBuffer(pos); 
00596     CS_SET_DISK_4(d.dumpHeader->size_4, size);;   
00597     CS_SET_DISK_1(d.dumpHeader->tab_id_1, tab_id);    
00598     CS_SET_DISK_4(d.dumpHeader->tab_version_4, tab_version);  
00599   
00600     sysDump->append(tabDump->getBuffer(0), size);
00601     pos += size + sizeof(DumpHeaderRec);
00602     release_(tabDump);
00603   }
00604   
00605   
00606   pop_(sysDump);
00607   release_(db);
00608   return_(sysDump);
00609 }
00610 
00611 //----------------
00612 void PBMSSystemTables::restoreSystemTables(MSDatabase *db, const char *data, size_t size)
00613 {
00614   uint32_t tab_size, tab_version;
00615   uint8_t tab_id;
00616   DumpDiskData  d;
00617 
00618   enter_();
00619   push_(db);
00620   
00621   while  ( size >= sizeof(DumpHeaderRec)) {
00622     d.rec_cchars = data;
00623     tab_size = CS_GET_DISK_4(d.dumpHeader->size_4);
00624     tab_id = CS_GET_DISK_1(d.dumpHeader->tab_id_1);     
00625     tab_version = CS_GET_DISK_4(d.dumpHeader->tab_version_4);   
00626     data += sizeof(DumpHeaderRec);
00627     size -= sizeof(DumpHeaderRec);
00628     
00629     if (size < tab_size) {
00630       CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "PBMS system table restore data truncated.");
00631     }
00632     
00633     switch (tab_id) {
00634       case MSHTTPHeaderTable::tableID:
00635         if (MSHTTPHeaderTable::tableVersion == tab_version)
00636           MSHTTPHeaderTable::restoreTable(RETAIN(db), data, tab_size);
00637         else
00638           CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "METADATA_HEADER_NAME" failed, incompatible table version" );
00639         break;
00640       case MSCloudTable::tableID:
00641         if (MSCloudTable::tableVersion == tab_version)
00642           MSCloudTable::restoreTable(RETAIN(db), data, tab_size);
00643         else
00644           CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "CLOUD_TABLE_NAME" failed, incompatible table version" );
00645         break;
00646       case MSBackupTable::tableID:
00647         if (MSBackupTable::tableVersion == tab_version)
00648           MSBackupTable::restoreTable(RETAIN(db), data, tab_size);
00649         else
00650           CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "BACKUP_TABLE_NAME" failed, incompatible table version" );
00651         break;        
00652       case MSVariableTable::tableID:
00653         if (MSVariableTable::tableVersion == tab_version)
00654           MSVariableTable::restoreTable(RETAIN(db), data, tab_size);
00655         else
00656           CSException::logException(CS_CONTEXT, MS_ERR_SYSTAB_VERSION, "Restore "VARIABLES_TABLE_NAME" failed, incompatible table version" );
00657         break;
00658       default:
00659         ASSERT(false);
00660     }
00661      data += tab_size;
00662      size -= tab_size;
00663   }
00664   
00665   if (size) {
00666     CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "PBMS trailing garbage in system table restore data being ignored.");
00667   }
00668     
00669   release_(db);
00670   exit_();
00671 }
00672 
00673 /*
00674  * -------------------------------------------------------------------------
00675  * MYSQL UTILITIES
00676  */
00677 
00678 void MSOpenSystemTable::setNotNullInRecord(Field *field, char *record)
00679 {
00680 #ifdef DRIZZLED
00681   if (field->null_ptr)
00682     record[(uint) (field->null_ptr - (uchar *) field->getTable()->getInsertRecord())] &= (uchar) ~field->null_bit;
00683 #else
00684   if (field->null_ptr)
00685     record[(uint) (field->null_ptr - (uchar *) field->table->record[0])] &= (uchar) ~field->null_bit;
00686 #endif
00687 }
00688 
00689 /*
00690  * -------------------------------------------------------------------------
00691  * OPEN SYSTEM TABLES
00692  */
00693 
00694 MSOpenSystemTable::MSOpenSystemTable(MSSystemTableShare *share, TABLE *table):
00695 CSRefObject()
00696 {
00697   myShare = share;
00698   mySQLTable = table;
00699 }
00700 
00701 MSOpenSystemTable::~MSOpenSystemTable()
00702 {
00703   MSSystemTableShare::releaseSystemTable(this);
00704 }
00705 
00706 
00707 /*
00708  * -------------------------------------------------------------------------
00709  * REPOSITORY TABLE
00710  */
00711 
00712 MSRepositoryTable::MSRepositoryTable(MSSystemTableShare *share, TABLE *table):
00713 MSOpenSystemTable(share, table),
00714 iCompactor(NULL),
00715 iRepoFile(NULL),
00716 iBlobBuffer(NULL)
00717 {
00718 }
00719 
00720 //-----------------------
00721 MSRepositoryTable::~MSRepositoryTable()
00722 {
00723   unuse();
00724   
00725   if (iBlobBuffer)
00726     iBlobBuffer->release();
00727 }
00728 
00729 //-----------------------
00730 void MSRepositoryTable::use()
00731 {
00732   MSDatabase *db;
00733   enter_();
00734 
00735   if (!iBlobBuffer)
00736     new_(iBlobBuffer, CSStringBuffer(20));
00737     
00738   db = myShare->mySysDatabase;
00739   if ((iCompactor = db->getCompactorThread())) {
00740     if (iCompactor->isMe(self))
00741       iCompactor = NULL;
00742     else {
00743       iCompactor->retain();
00744       iCompactor->suspend();
00745     }
00746   }
00747   exit_();
00748 }
00749 
00750 //-----------------------
00751 void MSRepositoryTable::unuse()
00752 {
00753   if (iCompactor) {
00754     iCompactor->resume();
00755     iCompactor->release();
00756     iCompactor = NULL;
00757   }
00758   if (iRepoFile) {
00759     iRepoFile->release();
00760     iRepoFile = NULL;
00761   }
00762   if (iBlobBuffer)
00763     iBlobBuffer->clear();
00764 }
00765 
00766 
00767 //-----------------------
00768 void MSRepositoryTable::seqScanInit()
00769 {
00770   enter_();
00771   
00772   // Flush all committed transactions to the repository file.
00773   MSTransactionManager::flush();
00774 
00775   iRepoIndex = 0;
00776   iRepoOffset = 0;
00777     
00778   exit_();
00779 }
00780 
00781 //-----------------------
00782 bool MSRepositoryTable::resetScan(bool positioned, uint32_t repo_index)
00783 {
00784   if (positioned) {
00785     if (iRepoFile && (repo_index != iRepoIndex)) {
00786       iRepoFile->release();
00787       iRepoFile = NULL;
00788     }
00789     
00790     iRepoIndex = repo_index;
00791   }
00792   if (iRepoFile) 
00793     return true;
00794     
00795   enter_();
00796   MSRepository  *repo = NULL;
00797   CSSyncVector  *repo_list = myShare->mySysDatabase->getRepositoryList();
00798 
00799   lock_(repo_list);
00800   for (; iRepoIndex<repo_list->size(); iRepoIndex++) {
00801     if ((repo = (MSRepository *) repo_list->get(iRepoIndex))) {
00802       iRepoFile = repo->openRepoFile();
00803       break;
00804     }
00805   }
00806   unlock_(repo_list);
00807   
00808   if (!iRepoFile)
00809     return_(false);
00810   iRepoFileSize = repo->getRepoFileSize();
00811   if ((!iRepoOffset) || !positioned) 
00812     iRepoOffset = repo->getRepoHeadSize();
00813     
00814   iRepoCurrentOffset = iRepoOffset;
00815     
00816   return_(true);
00817 }
00818 
00819 //-----------------------
00820 bool MSRepositoryTable::seqScanNext(char *buf)
00821 {
00822 
00823   enter_();
00824   iRepoCurrentOffset = iRepoOffset;
00825   
00826   if (returnSubRecord(buf))
00827     goto exit;
00828 
00829   restart:
00830   if ((!iRepoFile) && !MSRepositoryTable::resetScan(false))
00831     return false;
00832 
00833   while (iRepoOffset < iRepoFileSize) {
00834     if (returnRecord(buf))
00835       goto exit;
00836   }
00837 
00838   if (iRepoFile) {
00839     iRepoFile->release();
00840     iRepoFile = NULL;
00841     iRepoOffset = 0;
00842   }
00843   iRepoIndex++;
00844   goto restart;
00845 
00846   exit:
00847   return_(true);
00848 }
00849 
00850 //-----------------------
00851 int MSRepositoryTable::getRefLen()
00852 {
00853   return sizeof(iRepoIndex) + sizeof(iRepoOffset);
00854 }
00855 
00856 
00857 //-----------------------
00858 void MSRepositoryTable::seqScanPos(unsigned char *pos)
00859 {
00860   mi_int4store(pos, iRepoIndex); pos +=4;
00861   mi_int8store(pos, iRepoCurrentOffset);
00862 }
00863 
00864 //-----------------------
00865 void MSRepositoryTable::seqScanRead(uint32_t repo, uint64_t offset, char *buf)
00866 {
00867   iRepoOffset = offset;
00868 
00869   if (!resetScan(true, repo))
00870     return;
00871     
00872   seqScanNext(buf);
00873 }
00874 
00875 //-----------------------
00876 void MSRepositoryTable::seqScanRead(unsigned char *pos, char *buf)
00877 {
00878   seqScanRead(mi_uint4korr( pos), mi_uint8korr(pos +4), buf);
00879 }
00880 
00881 //-----------------------
00882 bool MSRepositoryTable::returnRecord(char *buf)
00883 {
00884   CSMutex     *lock;
00885   MSBlobHeadRec blob;
00886   uint16_t      head_size;
00887   uint64_t      blob_size;
00888   int       ref_count;
00889   size_t      ref_size;
00890   uint8_t     status;
00891 
00892   enter_();
00893   retry_read:
00894   lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
00895   lock_(lock);
00896   if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
00897     unlock_(lock);
00898     iRepoOffset = iRepoFileSize;
00899     return_(false);
00900   }
00901   head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00902   blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
00903   ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
00904   ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
00905   status = CS_GET_DISK_1(blob.rb_status_1);
00906   if (ref_count <= 0 || ref_size == 0 ||
00907     head_size < iRepoFile->myRepo->getRepoBlobHeadSize() + ref_count * ref_size || 
00908     !VALID_BLOB_STATUS(status)) {
00909     /* Can't be true. Assume this is garbage! */
00910     unlock_(lock);
00911     iRepoOffset++;
00912     goto retry_read;
00913   }
00914 
00915   if (IN_USE_BLOB_STATUS(status)) {
00916     unlock_(lock);
00917     if (!returnRow(&blob, buf)) {
00918       /* This record may not have had any data of interest. */
00919       iRepoOffset++;
00920       goto retry_read;
00921     }
00922     iRepoOffset += head_size + blob_size;
00923     return_(true);
00924   }
00925   unlock_(lock);
00926   iRepoOffset += head_size + blob_size;
00927   return_(false);
00928 }
00929 
00930 //-----------------------
00931 bool MSRepositoryTable::returnSubRecord(char *)
00932 {
00933   return false;
00934 }
00935 
00936 //-----------------------
00937 bool MSRepositoryTable::returnRow(MSBlobHeadPtr blob, char *buf)
00938 {
00939   TABLE   *table = mySQLTable;
00940   uint8_t   storage_type;
00941   uint32_t    access_count;
00942   uint32_t    last_access;
00943   uint32_t    last_ref;
00944   uint32_t    creation_time;
00945   uint32_t    access_code;
00946   uint16_t    head_size;
00947   uint16_t    alias_offset;
00948   uint64_t    blob_size;
00949   Field   *curr_field;
00950   byte    *save;
00951   MY_BITMAP *save_write_set;
00952   
00953   enter_();
00954 
00955   storage_type = CS_GET_DISK_1(blob->rb_storage_type_1);
00956   last_access = CS_GET_DISK_4(blob->rb_last_access_4);
00957   access_count = CS_GET_DISK_4(blob->rb_access_count_4);
00958   last_ref = CS_GET_DISK_4(blob->rb_last_ref_4);
00959   creation_time = CS_GET_DISK_4(blob->rb_create_time_4);
00960   head_size = CS_GET_DISK_2(blob->rb_head_size_2);
00961   blob_size = CS_GET_DISK_6(blob->rb_blob_repo_size_6);
00962   access_code = CS_GET_DISK_4(blob->rb_auth_code_4);
00963   alias_offset = CS_GET_DISK_2(blob->rb_alias_offset_2);
00964 
00965   /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
00966    * I use store()!??
00967    * But I want to use it! :(
00968    */
00969   save_write_set = table->write_set;
00970   table->write_set = NULL;
00971 
00972 #ifdef DRIZZLED
00973   memset(buf, 0xFF, table->getNullBytes());
00974 #else
00975   memset(buf, 0xFF, table->s->null_bytes);
00976 #endif
00977   for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
00978     curr_field = *field;
00979     save = curr_field->ptr;
00980 #if MYSQL_VERSION_ID < 50114
00981     curr_field->ptr = (byte *) buf + curr_field->offset();
00982 #else
00983 #ifdef DRIZZLED
00984     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
00985 #else
00986     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
00987 #endif
00988 #endif
00989     switch (curr_field->field_name[0]) {
00990       case 'A':
00991         switch (curr_field->field_name[9]) {
00992           case 'd':
00993             ASSERT(strcmp(curr_field->field_name, "Access_code") == 0);
00994             curr_field->store(access_code, true);
00995             setNotNullInRecord(curr_field, buf);
00996             break;
00997           case 'u':
00998             ASSERT(strcmp(curr_field->field_name, "Access_count") == 0);
00999             curr_field->store(access_count, true);
01000             setNotNullInRecord(curr_field, buf);
01001             break;
01002         }
01003         break;
01004       case 'R':
01005         switch (curr_field->field_name[6]) {
01006           case 't':
01007             // Repository_id     INT
01008             ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
01009             curr_field->store(iRepoFile->myRepo->getRepoID(), true);
01010             setNotNullInRecord(curr_field, buf);
01011             break;
01012           case 'l':
01013             // Repo_blob_offset  BIGINT
01014             ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
01015             curr_field->store(iRepoOffset, true);
01016             setNotNullInRecord(curr_field, buf);
01017             break;
01018         }
01019         break;
01020       case 'B':
01021         switch (curr_field->field_name[5]) {
01022           case 's':
01023             // Blob_size         BIGINT
01024             ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0);
01025             curr_field->store(blob_size, true);
01026             setNotNullInRecord(curr_field, buf);
01027             break;
01028           case 'a':
01029             // Blob_alias         
01030             ASSERT(strcmp(curr_field->field_name, "Blob_alias") == 0);
01031 #ifdef HAVE_ALIAS_SUPPORT
01032             if (alias_offset) {
01033               char blob_alias[BLOB_ALIAS_LENGTH +1];
01034               CSMutex *lock;
01035               uint64_t rsize;
01036               
01037               lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
01038               lock_(lock);
01039               rsize = iRepoFile->read(blob_alias, iRepoOffset + alias_offset, BLOB_ALIAS_LENGTH, 0);
01040               unlock_(lock);
01041               blob_alias[rsize] = 0;
01042               curr_field->store(blob_alias, strlen(blob_alias), &my_charset_utf8_general_ci);
01043               setNotNullInRecord(curr_field, buf);
01044             } else {
01045               curr_field->store((uint64_t) 0, true);
01046             }
01047 #else
01048             curr_field->store((uint64_t) 0, true);
01049 #endif
01050             
01051             break;
01052         }
01053 
01054         break;
01055       case 'M': // MD5_Checksum
01056         if (storage_type == MS_STANDARD_STORAGE) {
01057           char checksum[sizeof(Md5Digest) *2 +1];
01058           
01059           ASSERT(strcmp(curr_field->field_name, "MD5_Checksum") == 0);
01060           cs_bin_to_hex(sizeof(Md5Digest) *2 +1, checksum, sizeof(Md5Digest), &(blob->rb_blob_checksum_md5d));
01061           curr_field->store(checksum, sizeof(Md5Digest) *2, system_charset_info);
01062           setNotNullInRecord(curr_field, buf);
01063           
01064         } else
01065           curr_field->store((uint64_t) 0, true);
01066       
01067         break;
01068       case 'H':
01069         // Head_size         SMALLINT UNSIGNED
01070         ASSERT(strcmp(curr_field->field_name, "Head_size") == 0);
01071         curr_field->store(head_size, true);
01072         setNotNullInRecord(curr_field, buf);
01073         break;
01074       case 'C':
01075         // Creation_time     TIMESTAMP
01076         ASSERT(strcmp(curr_field->field_name, "Creation_time") == 0);
01077         curr_field->store(ms_my_1970_to_mysql_time(creation_time), true);
01078         setNotNullInRecord(curr_field, buf);
01079         break;
01080       case 'L':
01081         switch (curr_field->field_name[5]) {
01082           case 'r':
01083             // Last_ref_time     TIMESTAMP
01084             ASSERT(strcmp(curr_field->field_name, "Last_ref_time") == 0);
01085             curr_field->store(ms_my_1970_to_mysql_time(last_ref), true);
01086             setNotNullInRecord(curr_field, buf);
01087             break;
01088           case 'a':
01089             // Last_access_time  TIMESTAMP
01090             ASSERT(strcmp(curr_field->field_name, "Last_access_time") == 0);
01091             curr_field->store(ms_my_1970_to_mysql_time(last_access), true);
01092             setNotNullInRecord(curr_field, buf);
01093             break;
01094         }
01095         break;
01096     }
01097     curr_field->ptr = save;
01098   }
01099 
01100   table->write_set = save_write_set;
01101   return_(true);
01102 }
01103 
01104 /*
01105  * -------------------------------------------------------------------------
01106  * BLOB DATA TABLE
01107  */
01108 //-----------------------
01109 MSBlobDataTable::MSBlobDataTable(MSSystemTableShare *share, TABLE *table):MSRepositoryTable(share, table)
01110 {
01111 }
01112 
01113 //-----------------------
01114 MSBlobDataTable::~MSBlobDataTable()
01115 { 
01116 }
01117 
01118 //-----------------------
01119 bool MSBlobDataTable::returnRow(MSBlobHeadPtr blob, char *buf)
01120 {
01121   TABLE   *table = mySQLTable;
01122   uint8_t   blob_type;
01123   uint16_t    head_size;
01124   uint64_t    blob_size;
01125   uint32    len;
01126   Field   *curr_field;
01127   byte    *save;
01128   MY_BITMAP *save_write_set;
01129 
01130   head_size = CS_GET_DISK_2(blob->rb_head_size_2);
01131   blob_size = CS_GET_DISK_6(blob->rb_blob_repo_size_6);
01132   blob_type = CS_GET_DISK_1(blob->rb_storage_type_1);
01133 
01134   /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
01135    * I use store()!??
01136    * But I want to use it! :(
01137    */
01138   save_write_set = table->write_set;
01139   table->write_set = NULL;
01140 
01141 #ifdef DRIZZLED
01142   memset(buf, 0xFF, table->getNullBytes());
01143 #else
01144   memset(buf, 0xFF, table->s->null_bytes);
01145 #endif
01146   for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
01147     curr_field = *field;
01148     save = curr_field->ptr;
01149 #if MYSQL_VERSION_ID < 50114
01150     curr_field->ptr = (byte *) buf + curr_field->offset();
01151 #else
01152 #ifdef DRIZZLED
01153     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
01154 #else
01155     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
01156 #endif
01157 #endif
01158     switch (curr_field->field_name[0]) {
01159       case 'R':
01160         switch (curr_field->field_name[6]) {
01161           case 't':
01162             // Repository_id     INT
01163             ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
01164             curr_field->store(iRepoFile->myRepo->getRepoID(), true);
01165             setNotNullInRecord(curr_field, buf);
01166             break;
01167           case 'l':
01168             // Repo_blob_offset  BIGINT
01169             ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
01170             curr_field->store(iRepoOffset, true);
01171             setNotNullInRecord(curr_field, buf);
01172             break;
01173         }
01174         break;
01175       case 'B':
01176         // Blob_data         LONGBLOB
01177         ASSERT(strcmp(curr_field->field_name, "Blob_data") == 0);
01178         if (blob_size <= 0xFFFFFFF) {
01179           iBlobBuffer->setLength((uint32_t) blob_size);
01180           if (BLOB_IN_REPOSITORY(blob_type)) {
01181             len = iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset + head_size, (size_t) blob_size, 0);
01182           } else {
01183             CloudDB *blobCloud = myShare->mySysDatabase->myBlobCloud;
01184             CloudKeyRec key;
01185             ASSERT(blobCloud != NULL);
01186             
01187             MSRepoFile::getBlobKey(blob, &key);
01188             
01189             len = blobCloud->cl_getData(&key, iBlobBuffer->getBuffer(0), blob_size);
01190           }
01191           ((Field_blob *) curr_field)->set_ptr(len, (byte *) iBlobBuffer->getBuffer(0));
01192           setNotNullInRecord(curr_field, buf);
01193         }
01194         break;
01195     }
01196     curr_field->ptr = save;
01197   }
01198 
01199   table->write_set = save_write_set;
01200   return true;
01201 }
01202 
01203 #ifdef HAVE_ALIAS_SUPPORT
01204 /*
01205  * -------------------------------------------------------------------------
01206  * Alias DATA TABLE
01207  */
01208 bool MSBlobAliasTable::returnRow(MSBlobHeadPtr blob, char *buf)
01209 {
01210   TABLE   *table = mySQLTable;
01211   Field   *curr_field;
01212   byte    *save;
01213   MY_BITMAP *save_write_set;
01214   uint16_t    alias_offset; 
01215   char    blob_alias[BLOB_ALIAS_LENGTH +1];
01216   CSMutex   *lock;
01217   uint64_t    rsize;
01218   enter_();
01219   
01220   alias_offset = CS_GET_DISK_2(blob->rb_alias_offset_2);
01221 
01222   if (!alias_offset)
01223     return_(false);
01224 
01225   lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
01226   lock_(lock);
01227   rsize = iRepoFile->read(blob_alias, iRepoOffset + alias_offset, BLOB_ALIAS_LENGTH, 0);
01228   unlock_(lock);
01229   blob_alias[rsize] = 0;
01230   
01231   /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when
01232    * I use store()!??
01233    * But I want to use it! :(
01234    */
01235   save_write_set = table->write_set;
01236   table->write_set = NULL;
01237 
01238 #ifdef DRIZZLED
01239   memset(buf, 0xFF, table->getNullBytes());
01240 #else
01241   memset(buf, 0xFF, table->s->null_bytes);
01242 #endif
01243   for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
01244     curr_field = *field;
01245     save = curr_field->ptr;
01246 #if MYSQL_VERSION_ID < 50114
01247     curr_field->ptr = (byte *) buf + curr_field->offset();
01248 #else
01249 #ifdef DRIZZLED
01250     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
01251 #else
01252     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
01253 #endif
01254 #endif
01255     switch (curr_field->field_name[0]) {
01256       case 'R':
01257         switch (curr_field->field_name[6]) {
01258           case 't':
01259             // Repository_id     INT
01260             ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
01261             curr_field->store(iRepoFile->myRepo->getRepoID(), true);
01262             setNotNullInRecord(curr_field, buf);
01263             break;
01264           case 'l':
01265             // Repo_blob_offset  BIGINT
01266             ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
01267             curr_field->store(iRepoOffset, true);
01268             setNotNullInRecord(curr_field, buf);
01269             break;
01270         }
01271         break;
01272       case 'B':
01273         // Blob_alias         
01274         ASSERT(strcmp(curr_field->field_name, "Blob_alias") == 0);
01275         curr_field->store(blob_alias, strlen(blob_alias), &UTF8_CHARSET);
01276         setNotNullInRecord(curr_field, buf);
01277         
01278         break;
01279     }
01280     curr_field->ptr = save;
01281   }
01282 
01283   table->write_set = save_write_set;
01284   return_(true);
01285 }
01286 
01287 #endif
01288 
01289 /*
01290  * -------------------------------------------------------------------------
01291  * REFERENCE TABLE
01292  */
01293 
01294 MSReferenceTable::MSReferenceTable(MSSystemTableShare *share, TABLE *table):
01295 MSRepositoryTable(share, table),
01296 iRefDataList(NULL), 
01297 iRefDataSize(0), 
01298 iRefDataUsed(0),
01299 iRefDataPos(0),
01300 iRefOpenTable(NULL),
01301 iRefTempLog(NULL)
01302 {
01303 }
01304 
01305 MSReferenceTable::~MSReferenceTable()
01306 {
01307   if (iRefDataList)
01308     cs_free(iRefDataList);
01309   if (iRefOpenTable)
01310     iRefOpenTable->returnToPool();
01311   if (iRefTempLog)
01312     iRefTempLog->release();
01313 }
01314 
01315 void MSReferenceTable::unuse()
01316 {
01317   MSRepositoryTable::unuse();
01318   if (iRefDataList) {
01319     cs_free(iRefDataList);
01320     iRefDataList = NULL;
01321   }
01322   iRefDataSize = 0;
01323   if (iRefOpenTable) {
01324     iRefOpenTable->returnToPool();
01325     iRefOpenTable = NULL;
01326   }
01327   if (iRefTempLog) {
01328     iRefTempLog->release();
01329     iRefTempLog = NULL;
01330   }
01331 }
01332 
01333 
01334 void MSReferenceTable::seqScanInit()
01335 {
01336   MSRepositoryTable::seqScanInit();
01337   iRefDataUsed = 0;
01338   iRefDataPos = 0;
01339 }
01340 
01341 int MSReferenceTable::getRefLen()
01342 {
01343   return sizeof(iRefDataUsed) + sizeof(iRefDataPos) + sizeof(iRefCurrentIndex) + sizeof(iRefCurrentOffset);
01344 }
01345 
01346 void MSReferenceTable::seqScanPos(unsigned char *pos)
01347 {
01348   mi_int4store(pos, iRefCurrentDataPos); pos +=4;
01349   mi_int4store(pos, iRefCurrentDataUsed);pos +=4; 
01350   mi_int4store(pos, iRefCurrentIndex); pos +=4;
01351   mi_int8store(pos, iRefCurrentOffset);
01352 }
01353 
01354 void MSReferenceTable::seqScanRead(unsigned char *pos, char *buf)
01355 {
01356   iRefDataPos = mi_uint4korr( pos); pos +=4;
01357   iRefDataUsed = mi_uint4korr(pos); pos +=4;
01358   iRefBlobRepo = mi_uint4korr(pos); pos +=4;
01359   iRefBlobOffset = mi_uint8korr(pos);
01360   MSRepositoryTable::seqScanRead(iRefBlobRepo, iRefBlobOffset, buf);
01361 }
01362 
01363 bool MSReferenceTable::seqScanNext(char *buf)
01364 {
01365   iRefCurrentDataPos = iRefDataPos;
01366   iRefCurrentDataUsed = iRefDataUsed;
01367   
01368   // Reset the position
01369   return MSRepositoryTable::seqScanNext(buf);
01370 }
01371 // select * from pbms_reference order by blob_size DESC;
01372 // select * from pbms_reference order by Table_name DESC;
01373 
01374 bool MSReferenceTable::resetScan(bool positioned, uint32_t repo_index)
01375 {
01376   CSMutex       *lock;
01377   MSBlobHeadRec   blob;
01378   uint16_t      head_size;
01379   uint64_t      blob_size;
01380   MSRepoPointersRec ptr;
01381   size_t        ref_size;
01382   uint32_t      tab_index;
01383   uint32_t      ref_count;
01384   uint8_t       status;
01385 
01386   enter_();
01387   
01388   if (!MSRepositoryTable::resetScan(positioned, repo_index))
01389     return_(false);
01390   
01391   retry_read:
01392   lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
01393   lock_(lock);
01394   if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
01395     unlock_(lock);
01396     iRepoOffset = iRepoFileSize;
01397     return_(false);
01398   }
01399   head_size = CS_GET_DISK_2(blob.rb_head_size_2);
01400   blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
01401   ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
01402   ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
01403   status = CS_GET_DISK_1(blob.rb_status_1);
01404   if (ref_count <= 0 || ref_size == 0 ||
01405     head_size < iRepoFile->myRepo->getRepoBlobHeadSize() + ref_count * ref_size || 
01406     !VALID_BLOB_STATUS(status)) {
01407     /* Can't be true. Assume this is garbage! */
01408     unlock_(lock);
01409     iRepoOffset++;
01410     goto retry_read;
01411   }
01412 
01413   if (IN_USE_BLOB_STATUS(status)) {
01414     iBlobBuffer->setLength((uint32_t) head_size);
01415     if (iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset, head_size, 0) < head_size) {
01416       unlock_(lock);
01417       iRepoOffset = iRepoFileSize;
01418       return_(false);
01419     }
01420     unlock_(lock);
01421 
01422     iRefAuthCode = CS_GET_DISK_4(blob.rb_auth_code_4);
01423     iRefBlobSize = CS_GET_DISK_6(blob.rb_blob_data_size_6);;
01424     iRefBlobRepo = iRepoFile->myRepo->getRepoID();
01425     iRefBlobOffset = iRepoOffset;
01426 
01427     if (ref_count > iRefDataSize) {
01428       cs_realloc((void **) &iRefDataList, sizeof(MSRefDataRec) * ref_count);
01429       iRefDataSize = ref_count;
01430     }
01431     
01432     if (!positioned) 
01433       iRefDataPos = 0;
01434 
01435     // When ever the data position is reset the current location information
01436     // must also be reset so that it is consisent with the data position.
01437     iRefCurrentDataPos = iRefDataPos;
01438     iRefCurrentOffset = iRepoOffset;
01439     iRefCurrentIndex = iRepoIndex;
01440     iRefCurrentDataUsed = iRefDataUsed = ref_count;
01441     memset(iRefDataList, 0, sizeof(MSRefDataRec) * ref_count);
01442 
01443     uint32_t h = iRepoFile->myRepo->getRepoBlobHeadSize();
01444     ptr.rp_chars = iBlobBuffer->getBuffer(0) + h;
01445     for (uint32_t i=0; i<ref_count; i++) {
01446       switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
01447         case MS_BLOB_FREE_REF:
01448           break;
01449         case MS_BLOB_TABLE_REF: // The blob is intended for a file but has not been inserted yet.
01450           iRefDataList[i].rd_tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
01451           iRefDataList[i].rd_blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
01452           iRefDataList[i].rd_col_index = INVALID_INDEX;  // Means not used
01453           break;
01454         case MS_BLOB_DELETE_REF:
01455           tab_index = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
01456           if (tab_index && (tab_index <= ref_count)) {
01457             tab_index--;
01458             iRefDataList[tab_index].rd_temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
01459             iRefDataList[tab_index].rd_temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
01460           }
01461           else if (tab_index == INVALID_INDEX) {
01462             /* The is a reference from the temporary log only!! */
01463             iRefDataList[i].rd_tab_id = 0xFFFFFFFF;  // Indicates no table
01464             iRefDataList[i].rd_blob_id = iRepoOffset;
01465             iRefDataList[i].rd_blob_ref_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);;
01466             iRefDataList[i].rd_col_index = INVALID_INDEX;  // Means not used
01467             iRefDataList[i].rd_temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
01468             iRefDataList[i].rd_temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
01469           }
01470           break;
01471         default:
01472           MSRepoTableRefPtr tab_ref;
01473 
01474           tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1;
01475           if (tab_index < ref_count) {
01476             tab_ref = (MSRepoTableRefPtr) (iBlobBuffer->getBuffer(0) + iRepoFile->myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
01477   
01478             iRefDataList[i].rd_tab_id = CS_GET_DISK_4(tab_ref->tr_table_id_4);
01479             iRefDataList[i].rd_blob_id = CS_GET_DISK_6(tab_ref->tr_blob_id_6);
01480             iRefDataList[i].rd_col_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_col_index_2);
01481             iRefDataList[i].rd_blob_ref_id = COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8));
01482 
01483             iRefDataList[tab_index].rd_ref_count++;
01484           }
01485           else {
01486             /* Can't be true. Assume this is garbage! */
01487             unlock_(lock);
01488             iRepoOffset++;
01489             goto retry_read;
01490           }
01491           break;
01492       }
01493       ptr.rp_chars += ref_size;
01494     }
01495 
01496     iRepoOffset += head_size + blob_size;
01497 
01498     return_(true);
01499   }
01500   unlock_(lock);
01501   iRepoOffset += head_size + blob_size;
01502   return_(false);
01503 }
01504 
01505 bool MSReferenceTable::returnRecord(char *buf)
01506 {
01507   if (!resetScan(false))
01508     return false;
01509     
01510   return(returnSubRecord(buf));
01511 }
01512 
01513 bool MSReferenceTable::returnSubRecord(char *buf)
01514 {
01515   uint32_t i;
01516   
01517   while (iRefDataPos < iRefDataUsed) {
01518     i = iRefDataPos++;
01519     if (iRefDataList[i].rd_tab_id) {
01520       if (iRefDataList[i].rd_col_index == INVALID_INDEX) {
01521         /* This is a table reference */
01522         if (!iRefDataList[i].rd_ref_count || iRefDataList[i].rd_temp_log_id) {
01523           returnRow(&iRefDataList[i], buf);
01524           return true;
01525         }
01526       }
01527       else {
01528         /* This is an engine reference */
01529         returnRow(&iRefDataList[i], buf);
01530         return true;
01531       }
01532     }
01533   }
01534 
01535   return false;
01536 }
01537 
01538 void MSReferenceTable::returnRow(MSRefDataPtr ref_data, char *buf)
01539 {
01540   TABLE     *table = mySQLTable;
01541   Field     *curr_field;
01542   byte      *save;
01543   MY_BITMAP   *save_write_set;
01544   MY_BITMAP   *save_read_set;
01545   bool      have_times = false;
01546   time_t      delete_time;
01547   int32_t     countdown = 0;
01548 
01549   if (iRefOpenTable) {
01550     if (iRefOpenTable->getDBTable()->myTableID != ref_data->rd_tab_id) {
01551       iRefOpenTable->returnToPool();
01552       iRefOpenTable = NULL;
01553     }
01554   }
01555 
01556   if (!iRefOpenTable && ref_data->rd_tab_id != 0xFFFFFFFF)
01557     iRefOpenTable = MSTableList::getOpenTableByID(myShare->mySysDatabase->myDatabaseID, ref_data->rd_tab_id);
01558 
01559   if (ref_data->rd_temp_log_id) {
01560     if (iRefTempLog) {
01561       if (iRefTempLog->myTempLogID != ref_data->rd_temp_log_id) {
01562         iRefTempLog->release();
01563         iRefTempLog = NULL;
01564       }
01565     }
01566     if (!iRefTempLog)
01567       iRefTempLog = myShare->mySysDatabase->openTempLogFile(ref_data->rd_temp_log_id, NULL, NULL);
01568 
01569     if (iRefTempLog) {
01570       MSTempLogItemRec  log_item;
01571 
01572       if (iRefTempLog->read(&log_item, ref_data->rd_temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
01573         have_times = true;
01574         delete_time = CS_GET_DISK_4(log_item.ti_time_4);
01575         countdown = (int32_t) (delete_time + PBMSParameters::getTempBlobTimeout()) - (int32_t) time(NULL);
01576       }
01577     }
01578   }
01579 
01580   if (ref_data->rd_col_index != INVALID_INDEX) {
01581     if (iRefOpenTable) {
01582       if (iRefOpenTable->getDBTable()->isToDelete()) {
01583         have_times = true;
01584         iRefOpenTable->getDBTable()->getDeleteInfo(&ref_data->rd_temp_log_id, &ref_data->rd_temp_log_offset, &delete_time);
01585         ref_data->rd_col_index = INVALID_INDEX;
01586         countdown = (int32_t) (delete_time + PBMSParameters::getTempBlobTimeout()) - (int32_t) time(NULL);
01587       }
01588     }
01589     else
01590       ref_data->rd_col_index = INVALID_INDEX;
01591   }
01592 
01593   save_write_set = table->write_set;
01594   save_read_set = table->read_set;
01595   table->write_set = NULL;
01596   table->read_set = NULL;
01597 
01598 #ifdef DRIZZLED
01599   memset(buf, 0xFF, table->getNullBytes());
01600 #else
01601   memset(buf, 0xFF, table->s->null_bytes);
01602 #endif
01603   for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
01604     curr_field = *field;
01605     save = curr_field->ptr;
01606 #if MYSQL_VERSION_ID < 50114
01607     curr_field->ptr = (byte *) buf + curr_field->offset();
01608 #else
01609 #ifdef DRIZZLED
01610     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
01611 #else
01612     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
01613 #endif
01614 #endif
01615     switch (curr_field->field_name[0]) {
01616       case 'B':
01617         switch (curr_field->field_name[5]) {
01618           case 'i':
01619             // Blob_id           BIGINT,
01620             ASSERT(strcmp(curr_field->field_name, "Blob_id") == 0);
01621             if (ref_data->rd_tab_id == 0xFFFFFFFF)
01622               curr_field->store(0, true);
01623             else {
01624               curr_field->store(ref_data->rd_blob_id, true);
01625               setNotNullInRecord(curr_field, buf);
01626             }
01627             break;
01628           case 'u':
01629             // Blob_url          VARCHAR(120),
01630             PBMSBlobURLRec blob_url;
01631 
01632             ASSERT(strcmp(curr_field->field_name, "Blob_url") == 0);
01633             if (ref_data->rd_tab_id != 0xFFFFFFFF) {
01634               iRefOpenTable->formatBlobURL(&blob_url, ref_data->rd_blob_id, iRefAuthCode, iRefBlobSize, ref_data->rd_blob_ref_id);
01635               curr_field->store(blob_url.bu_data, strlen(blob_url.bu_data) +1, &UTF8_CHARSET); // Include the null char in the url. This is the way it is stored in user tables.
01636               setNotNullInRecord(curr_field, buf);
01637             }
01638             break;
01639           case 's':
01640             // Blob_size         BIGINT,
01641             ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0);
01642             curr_field->store(iRefBlobSize, true);
01643             setNotNullInRecord(curr_field, buf);
01644             break;
01645         }
01646         break;
01647       case 'C':
01648         // Column_ordinal       INT,
01649         ASSERT(strcmp(curr_field->field_name, "Column_ordinal") == 0);
01650         if (ref_data->rd_col_index != INVALID_INDEX) {
01651           curr_field->store(ref_data->rd_col_index +1, true);
01652           setNotNullInRecord(curr_field, buf);
01653         }
01654         break;
01655       case 'D':
01656         // Deletion_time     TIMESTAMP,
01657         ASSERT(strcmp(curr_field->field_name, "Deletion_time") == 0);
01658         if (have_times) {
01659           curr_field->store(ms_my_1970_to_mysql_time(delete_time), true);
01660           setNotNullInRecord(curr_field, buf);
01661         }
01662         break;
01663       case 'R':
01664         switch (curr_field->field_name[5]) {
01665           case 'i':
01666             // Repository_id     INT,
01667             ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
01668             curr_field->store(iRefBlobRepo, true);
01669             setNotNullInRecord(curr_field, buf);
01670             break;
01671           case 'b':
01672             // Repo_blob_offset  BIGINT,
01673             ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
01674             curr_field->store(iRefBlobOffset, true);
01675             setNotNullInRecord(curr_field, buf);
01676             break;
01677           case 'e':
01678             // Remove_in INT
01679             ASSERT(strcmp(curr_field->field_name, "Remove_in") == 0);
01680             if (have_times) {
01681               curr_field->store(countdown, false);
01682               setNotNullInRecord(curr_field, buf);
01683             }
01684             break;
01685         }
01686         break;
01687       case 'T':
01688         switch (curr_field->field_name[9]) {
01689           case 'e':
01690             // Table_name        CHAR(64),
01691             ASSERT(strcmp(curr_field->field_name, "Table_name") == 0);
01692             if (ref_data->rd_tab_id != 0xFFFFFFFF) {
01693               if (iRefOpenTable) {
01694                 CSString *table_name = iRefOpenTable->getDBTable()->getTableName();
01695                 curr_field->store(table_name->getCString(), table_name->length(), &UTF8_CHARSET);
01696               }
01697               else {
01698                 char buffer[MS_TABLE_NAME_SIZE];
01699                 
01700                 snprintf(buffer, MS_TABLE_NAME_SIZE, "Table #%"PRIu32"", ref_data->rd_tab_id);
01701                 curr_field->store(buffer, strlen(buffer), &UTF8_CHARSET);
01702               }
01703               setNotNullInRecord(curr_field, buf);
01704             }
01705             break;
01706           case 'i':
01707             // Temp_log_id       INT,
01708             ASSERT(strcmp(curr_field->field_name, "Temp_log_id") == 0);
01709             if (ref_data->rd_temp_log_id) {
01710               curr_field->store(ref_data->rd_temp_log_id, true);
01711               setNotNullInRecord(curr_field, buf);
01712             }
01713             break;
01714           case 'o':
01715             // Temp_log_offset   BIGINT
01716             ASSERT(strcmp(curr_field->field_name, "Temp_log_offset") == 0);
01717             if (ref_data->rd_temp_log_id) {
01718               curr_field->store(ref_data->rd_temp_log_offset, true);
01719               setNotNullInRecord(curr_field, buf);
01720             }
01721             break;
01722         }
01723         break;
01724     }
01725     curr_field->ptr = save;
01726   }
01727 
01728   table->write_set = save_write_set;
01729   table->read_set = save_read_set;
01730 }
01731 
01732 /*
01733  * -------------------------------------------------------------------------
01734  * META DATA TABLE
01735  */
01736 MSMetaDataTable::MSMetaDataTable(MSSystemTableShare *share, TABLE *table):
01737 MSRepositoryTable(share, table),
01738 iMetData(NULL), 
01739 iMetCurrentBlobRepo(0),
01740 iMetCurrentBlobOffset(0),
01741 iMetCurrentDataPos(0),
01742 iMetCurrentDataSize(0),
01743 iMetDataPos(0),
01744 iMetDataSize(0), 
01745 iMetBlobRepo(0),
01746 iMetBlobOffset(0),
01747 iMetStateSaved(false)
01748 {
01749 }
01750 
01751 MSMetaDataTable::~MSMetaDataTable()
01752 {
01753   if (iMetData) {
01754     iMetData->release();
01755     iMetData = NULL;
01756   }
01757 }
01758 
01759 MSMetaDataTable *MSMetaDataTable::newMSMetaDataTable(MSDatabase *db)
01760 {
01761   char path[PATH_MAX];
01762   
01763   cs_strcpy(PATH_MAX, path, db->myDatabasePath->getCString());
01764   db->release();
01765   cs_add_dir_char(PATH_MAX, path);
01766   cs_strcat(PATH_MAX, path, sysTableNames[SYS_META]);
01767   
01768   return  (MSMetaDataTable*) MSSystemTableShare::openSystemTable(path, NULL);
01769 }
01770 
01771 void MSMetaDataTable::use()
01772 {
01773   MSRepositoryTable::use();
01774   new_(iMetData, CSStringBuffer(80));
01775   iMetDataSize = 0;
01776 }
01777 
01778 void MSMetaDataTable::unuse()
01779 {
01780   MSRepositoryTable::unuse();
01781   if (iMetData) {
01782     iMetData->release();
01783     iMetData = NULL;
01784   }
01785   iMetDataSize = 0;
01786 }
01787 
01788 
01789 void MSMetaDataTable::seqScanInit()
01790 {
01791   MSRepositoryTable::seqScanInit();
01792   iMetDataSize = 0;
01793   iMetDataPos = 0;
01794   iMetBlobRepo = 0;
01795   iMetBlobOffset = 0;
01796   iMetStateSaved = false;
01797 }
01798 
01799 void MSMetaDataTable::seqScanReset()
01800 {
01801   seqScanPos(iMetState);
01802   seqScanInit();
01803   iMetStateSaved = true;
01804 }
01805 
01806 int MSMetaDataTable::getRefLen()
01807 {
01808   return sizeof(iMetCurrentDataPos) + sizeof(iMetCurrentDataSize) + sizeof(iMetCurrentBlobRepo) + sizeof(iMetCurrentBlobOffset);
01809 }
01810 
01811 void MSMetaDataTable::seqScanPos(unsigned char *pos)
01812 {
01813   mi_int4store(pos, iMetCurrentDataPos); pos +=4;
01814   mi_int4store(pos, iMetCurrentDataSize);pos +=4; 
01815   mi_int4store(pos, iMetCurrentBlobRepo); pos +=4;
01816   mi_int8store(pos, iMetCurrentBlobOffset);
01817 }
01818 
01819 void MSMetaDataTable::seqScanRead(unsigned char *pos, char *buf)
01820 {
01821   iMetStateSaved = false;
01822   iMetDataPos = mi_uint4korr( pos); pos +=4;
01823   iMetDataSize = mi_uint4korr(pos); pos +=4;
01824   iMetBlobRepo = mi_uint4korr(pos); pos +=4;
01825   iMetBlobOffset = mi_uint8korr(pos);
01826   MSRepositoryTable::seqScanRead(iMetBlobRepo, iMetBlobOffset, buf);
01827 }
01828 
01829 bool MSMetaDataTable::seqScanNext(char *buf)
01830 {
01831   if (iMetStateSaved) {
01832     bool have_data;
01833     uint8_t *pos = iMetState;
01834     iMetDataPos = mi_uint4korr( pos); pos +=4;
01835     // Do not reset the meta data size.
01836     /*iMetDataSize = mi_uint4korr(pos); */pos +=4;
01837     iMetBlobRepo = mi_uint4korr(pos); pos +=4;
01838     iMetBlobOffset = mi_uint8korr(pos);
01839     iMetStateSaved = false;
01840     resetScan(true, &have_data, iMetBlobRepo);
01841   }
01842   
01843   iMetCurrentDataPos = iMetDataPos;
01844   iMetCurrentDataSize = iMetDataSize;
01845   
01846   return MSRepositoryTable::seqScanNext(buf);
01847 }
01848 
01849 bool MSMetaDataTable::resetScan(bool positioned, bool *have_data, uint32_t repo_index)
01850 {
01851   CSMutex       *lock;
01852   MSBlobHeadRec   blob;
01853   uint16_t        head_size;
01854   uint64_t        blob_size;
01855   size_t        mdata_size, mdata_offset;
01856   uint8_t       status;
01857 
01858   enter_();
01859   
01860   *have_data = false;
01861   if (!MSRepositoryTable::resetScan(positioned, repo_index))
01862     return_(false);
01863   
01864   retry_read:
01865   lock = iRepoFile->myRepo->getRepoLock(iRepoOffset);
01866   lock_(lock);
01867   if (iRepoFile->read(&blob, iRepoOffset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
01868     unlock_(lock);
01869     iRepoOffset = iRepoFileSize;
01870     return_(false);
01871   }
01872   
01873   head_size = CS_GET_DISK_2(blob.rb_head_size_2);
01874   blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
01875   mdata_size = CS_GET_DISK_2(blob.rb_mdata_size_2);
01876   mdata_offset = CS_GET_DISK_2(blob.rb_mdata_offset_2);
01877   
01878   status = CS_GET_DISK_1(blob.rb_status_1);
01879   if ((head_size < (mdata_offset + mdata_size)) || !VALID_BLOB_STATUS(status)) {
01880     /* Can't be true. Assume this is garbage! */
01881     unlock_(lock);
01882     iRepoOffset++;
01883     goto retry_read;
01884   }
01885 
01886   if (mdata_size && IN_USE_BLOB_STATUS(status)) {
01887     iMetData->setLength((uint32_t) mdata_size);
01888     if (iRepoFile->read(iMetData->getBuffer(0), iRepoOffset + mdata_offset, mdata_size, 0) < mdata_size) {
01889       unlock_(lock);
01890       iRepoOffset = iRepoFileSize;
01891       return_(false);
01892     }
01893 
01894     iMetBlobRepo = iRepoFile->myRepo->getRepoID();
01895     iMetBlobOffset = iRepoOffset;
01896 
01897     if (!positioned) 
01898       iMetDataPos = 0;
01899 
01900     iMetDataSize = mdata_size;
01901     
01902     // When ever the data position is reset the current location information
01903     // must also be reset to that it is consisent with the data position.
01904     iMetCurrentBlobOffset = iRepoOffset;
01905     iMetCurrentBlobRepo = iRepoIndex;   
01906     iMetCurrentDataPos = iMetDataPos;
01907     iMetCurrentDataSize = iMetDataSize;
01908     
01909     *have_data = true;
01910   }
01911   unlock_(lock);
01912   iRepoOffset += head_size + blob_size;
01913   return_(true);
01914 }
01915 
01916 bool MSMetaDataTable::returnRecord(char *buf)
01917 {
01918   bool have_data = false;
01919 
01920   if (resetScan(false, &have_data) && have_data)
01921     return(returnSubRecord(buf));
01922     
01923   return false;
01924 }
01925 
01926 bool MSMetaDataTable::nextRecord(char **name, char **value)
01927 {
01928   if (iMetDataPos < iMetDataSize) {
01929     char *data = iMetData->getBuffer(iMetDataPos);
01930     
01931     *name = data;
01932     data += strlen(*name) +1;
01933     *value = data;
01934     data += strlen(*value) +1;
01935     
01936     iMetDataPos += data - *name;
01937     ASSERT(iMetDataPos <= iMetDataSize);
01938     
01939     return true;    
01940   }
01941 
01942   return false;
01943   
01944 }
01945 
01946 bool MSMetaDataTable::returnSubRecord(char *buf)
01947 {
01948   char *name, *value;
01949   
01950   if (nextRecord(&name, &value)) {
01951     returnRow(name, value, buf);    
01952     return true;    
01953   }
01954 
01955   return false;
01956 }
01957 
01958 void MSMetaDataTable::returnRow(char *name, char *value, char *buf)
01959 {
01960   TABLE   *table = mySQLTable;
01961   Field   *curr_field;
01962   byte    *save;
01963   MY_BITMAP *save_write_set;
01964 
01965   save_write_set = table->write_set;
01966   table->write_set = NULL;
01967 
01968 #ifdef DRIZZLED
01969   memset(buf, 0xFF, table->getNullBytes());
01970 #else
01971   memset(buf, 0xFF, table->s->null_bytes);
01972 #endif
01973   for (Field **field=GET_TABLE_FIELDS(table) ; *field ; field++) {
01974     curr_field = *field;
01975     save = curr_field->ptr;
01976 #if MYSQL_VERSION_ID < 50114
01977     curr_field->ptr = (byte *) buf + curr_field->offset();
01978 #else
01979 #ifdef DRIZZLED
01980     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->getTable()->getInsertRecord());
01981 #else
01982     curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]);
01983 #endif
01984 #endif
01985     switch (curr_field->field_name[0]) {
01986       case 'R':
01987         switch (curr_field->field_name[6]) {
01988           case 't':
01989             // Repository_id     INT
01990             ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0);
01991             curr_field->store(iRepoFile->myRepo->getRepoID(), true);
01992             setNotNullInRecord(curr_field, buf);
01993             break;
01994           case 'l':
01995             // Repo_blob_offset  BIGINT
01996             ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0);
01997             curr_field->store(iMetCurrentBlobOffset, true);
01998             setNotNullInRecord(curr_field, buf);
01999             break;
02000         }
02001         break;
02002       case 'N':
02003         // Name        
02004         ASSERT(strcmp(curr_field->field_name, "Name") == 0);
02005         curr_field->store(name, strlen(name), &UTF8_CHARSET);
02006         setNotNullInRecord(curr_field, buf);
02007         break;
02008       case 'V':
02009         // Value        
02010         ASSERT(strcmp(curr_field->field_name, "Value") == 0);
02011         curr_field->store(value, strlen(value), &my_charset_utf8_bin);
02012         setNotNullInRecord(curr_field, buf);
02013         break;
02014     }
02015     curr_field->ptr = save;
02016   }
02017 
02018   table->write_set = save_write_set;
02019 }
02020 
02021 
02022 #ifdef HAVE_ALIAS_SUPPORT
02023 bool MSMetaDataTable::matchAlias(uint32_t repo_id, uint64_t offset, const char *alias)
02024 {
02025   bool matched = false, have_data = false;
02026   enter_();
02027   
02028   if (resetScan(true, &have_data, repo_id) && have_data) {
02029     const char *blob_alias;
02030     MetaData md(iMetData->getBuffer(0), iMetDataSize);
02031     
02032     blob_alias = md.findAlias();
02033     matched = (blob_alias && !my_strcasecmp(&UTF8_CHARSET, blob_alias, alias));
02034   }
02035   
02036   return_(matched);
02037 }
02038 #endif
02039 
02040 void MSMetaDataTable::insertRow(char *buf)
02041 {
02042   uint32_t repo_index;
02043   String meta_name, meta_value; 
02044   uint16_t data_len;
02045   uint64_t repo_offset;
02046   char *data;   
02047   bool have_data, reset_alias = false;
02048   
02049   enter_();
02050   
02051   // Metadata inserts are ignored during reovery.
02052   // They will be restored from the dump table.
02053   if (myShare->mySysDatabase->isRecovering())
02054     exit_();
02055     
02056   seqScanReset();
02057 
02058   getFieldValue(buf, 0, &repo_index);
02059   getFieldValue(buf, 1, &repo_offset);
02060   getFieldValue(buf, 2, &meta_name);
02061   getFieldValue(buf, 3, &meta_value);
02062   
02063   if (!repo_index)
02064     CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
02065     
02066   iRepoOffset = repo_offset;
02067   if (!resetScan(true, &have_data, repo_index -1))
02068     CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
02069   
02070   const char *alias = NULL, *tag = meta_name.c_ptr_safe();
02071   
02072   if (iMetDataSize) {
02073     MetaData md(iMetData->getBuffer(0), iMetDataSize);
02074     // Check to see it this is a duplicate name.
02075     if (md.findName(tag))
02076       CSException::throwException(CS_CONTEXT, HA_ERR_FOUND_DUPP_KEY, "Meta data tag already exists.");
02077       
02078 #ifdef HAVE_ALIAS_SUPPORT
02079     alias = md.findAlias();
02080 #endif
02081   }
02082   
02083   // Create the meta data record.
02084 #ifdef HAVE_ALIAS_SUPPORT
02085   if (alias)
02086     data = iMetData->getBuffer(0); // This is to be able to test if the alias pointer needs to be reset.
02087   else
02088 #endif
02089     data = NULL;
02090     
02091   iMetData->setLength(iMetDataSize + meta_name.length() + meta_value.length()  + 2);
02092   
02093 #ifdef HAVE_ALIAS_SUPPORT
02094   if (alias && (data != iMetData->getBuffer(0))) // The buffer moved, adjust the alias.
02095     alias += (iMetData->getBuffer(0) - data);
02096 #endif
02097   
02098   data = iMetData->getBuffer(0);
02099   data_len = iMetDataSize;
02100   
02101 #ifdef HAVE_ALIAS_SUPPORT
02102   if ((!alias) && !my_strcasecmp(&UTF8_CHARSET, MS_ALIAS_TAG, tag)) {
02103     reset_alias = true;
02104     memcpy(data + data_len, MS_ALIAS_TAG, meta_name->length()); // Use my alias tag so we do not need to wory about case.
02105     alias = data + data_len + meta_name->length() + 1; // Set the alias to the value location.
02106   } else 
02107 #endif
02108     memcpy(data + data_len, meta_name.c_ptr_quick(), meta_name.length());
02109     
02110   data_len += meta_name.length();
02111   data[data_len] = 0;
02112   data_len++;
02113 
02114   memcpy(data + data_len, meta_value.c_ptr_quick(), meta_value.length());
02115   data_len += meta_value.length();
02116   data[data_len] = 0;
02117   data_len++;
02118   
02119   // Update the blob header with the new meta data.
02120   MSOpenTable *otab = MSOpenTable::newOpenTable(NULL);
02121   push_(otab);
02122   iRepoFile->setBlobMetaData(otab, repo_offset, data, data_len, reset_alias, alias);
02123   release_(otab);
02124     
02125   exit_();
02126 }
02127 /*
02128 insert into pbms_mata_data values(1, 921, "ATAG 3", "xx");
02129 insert into pbms_mata_data values(1, 921, "ATAG 1", "A VALUE 1");
02130 insert into pbms_mata_data values(1, 921, "ATAG 2", "xx");
02131 insert into pbms_mata_data values(1, 383, "ATAG 2", "xx");
02132 select * from pbms_mata_data;
02133  
02134 delete from pbms_mata_data where value = "xx";
02135 select * from pbms_mata_data;
02136 
02137 update pbms_mata_data set value = "!!" where name = "ATAG 3";
02138 update pbms_mata_data set Repo_blob_offset = 383 where value = "!!";
02139 
02140 delete from pbms_mata_data where Repo_blob_offset = 921;
02141 */
02142 //insert into pbms_mata_data values(1, 921, "blob_ALIAs", "My_alias");
02143 //select * from pbms_mata_data;
02144 //select * from pbms_repository;
02145 
02146 
02147 void MSMetaDataTable::deleteRow(char *buf)
02148 {
02149   uint32_t repo_index;
02150   String meta_name, meta_value; 
02151   uint16_t record_size;
02152   uint64_t repo_offset;
02153   char *data;
02154   bool have_data, reset_alias = false;
02155   
02156   enter_();
02157   
02158   seqScanReset();
02159 
02160   getFieldValue(buf, 0, &repo_index);
02161   getFieldValue(buf, 1, &repo_offset);
02162   getFieldValue(buf, 2, &meta_name);
02163   getFieldValue(buf, 3, &meta_value);
02164 
02165   if (!repo_index)
02166     CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
02167     
02168   iRepoOffset = repo_offset;
02169   if (!resetScan(true, &have_data, repo_index -1))
02170     CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
02171   
02172   const char *alias = NULL, *value = NULL, *tag = meta_name.c_ptr_safe();
02173   char *location;
02174   
02175   // Check to see name exists.
02176   MetaData md;
02177 
02178   md.use_data(iMetData->getBuffer(0), iMetDataSize);
02179   if (iMetDataSize) 
02180     value = md.findName(tag);
02181   
02182   if (value == NULL)
02183     CSException::throwException(CS_CONTEXT, HA_ERR_KEY_NOT_FOUND, "Meta data tag dosn't exists.");
02184       
02185 #ifdef HAVE_ALIAS_SUPPORT
02186   alias = md.findAlias();
02187   
02188   if (alias == value) {
02189     reset_alias = true;
02190     alias = NULL;
02191   }
02192 #endif
02193   
02194   // Create the meta data record.
02195   data = md.getBuffer();
02196   location = md.findNamePosition(tag);
02197   record_size = MetaData::recSize(location);
02198   iMetDataSize -= record_size;
02199   memmove(location, location + record_size, iMetDataSize - (location - data)); // Shift the meta data down
02200 
02201 #ifdef HAVE_ALIAS_SUPPORT
02202   // Get the alias again incase it moved.
02203   if (alias)
02204     alias = md.findAlias();
02205 #endif
02206   
02207   // Update the blob header with the new meta data.
02208   MSOpenTable *otab = MSOpenTable::newOpenTable(NULL);
02209   push_(otab);
02210   iRepoFile->setBlobMetaData(otab, repo_offset, data, iMetDataSize, reset_alias, alias);
02211   release_(otab);
02212 
02213   exit_();
02214 }
02215 
02216 class UpdateRowCleanUp : public CSRefObject {
02217   bool do_cleanup;
02218   MSMetaDataTable *tab;
02219   char *row_data;
02220   
02221   uint32_t ref_id;
02222 
02223   public:
02224   
02225   UpdateRowCleanUp(): CSRefObject(),
02226     do_cleanup(false), tab(NULL), row_data(NULL){}
02227     
02228   ~UpdateRowCleanUp() 
02229   {
02230     if (do_cleanup) {
02231       tab->deleteRow(row_data);
02232     }
02233   }
02234   
02235   void setCleanUp(MSMetaDataTable *table, char *data)
02236   {
02237     row_data = data;
02238     tab = table;
02239     do_cleanup = true;
02240   }
02241   
02242   void cancelCleanUp()
02243   {
02244     do_cleanup = false;
02245   }
02246   
02247 };
02248 
02249 void MSMetaDataTable::updateRow(char *old_data, char *new_data)
02250 {
02251   uint32_t o_repo_index, n_repo_index;
02252   String n_meta_name, n_meta_value; 
02253   String o_meta_name, o_meta_value; 
02254   uint16_t record_size;
02255   uint64_t o_repo_offset, n_repo_offset;
02256   char *data; 
02257   bool have_data, reset_alias = false;
02258   
02259   enter_();
02260   
02261   seqScanReset();
02262 
02263   getFieldValue(new_data, 0, &n_repo_index);
02264   getFieldValue(new_data, 1, &n_repo_offset);
02265   getFieldValue(new_data, 2, &n_meta_name);
02266   getFieldValue(new_data, 3, &n_meta_value);
02267 
02268   getFieldValue(old_data, 0, &o_repo_index);
02269   getFieldValue(old_data, 1, &o_repo_offset);
02270   getFieldValue(old_data, 2, &o_meta_name);
02271   getFieldValue(old_data, 3, &o_meta_value);
02272 
02273   if ((!o_repo_index) || (!n_repo_index))
02274     CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id");
02275   
02276   // If the meta data is not for the same BLOB then we do an insert and delete.
02277   if ((n_repo_index != o_repo_index) || (n_repo_offset != o_repo_offset)) {
02278     UpdateRowCleanUp *cleanup;
02279     new_(cleanup, UpdateRowCleanUp());
02280     push_(cleanup);
02281 
02282     insertRow(new_data);
02283     
02284     cleanup->setCleanUp(this, new_data);
02285     
02286     deleteRow(old_data);
02287     
02288     cleanup->cancelCleanUp();
02289     release_(cleanup);
02290     
02291     exit_();
02292   }
02293   
02294   iRepoOffset = n_repo_offset;
02295   if (!resetScan(true, &have_data, n_repo_index -1))
02296     CSException::throwException(CS_CONTEXT, HA_ERR_CANNOT_ADD_FOREIGN, "Invalid Repository_id or Repo_blob_offset");
02297   
02298   char *location;
02299   const char *value, *alias = NULL, *n_tag = n_meta_name.c_ptr_safe(), *o_tag = o_meta_name.c_ptr_safe();
02300   
02301   if (!my_strcasecmp(&UTF8_CHARSET, o_tag, n_tag))
02302     n_tag = NULL;
02303     
02304   MetaData md;
02305 
02306   md.use_data(iMetData->getBuffer(0), iMetDataSize);
02307   
02308   if ((!iMetDataSize) || ((value = md.findName(o_tag)) == NULL))
02309     CSException::throwException(CS_CONTEXT, HA_ERR_KEY_NOT_FOUND, "Meta data tag dosn't exists.");
02310       
02311   if (n_tag && (md.findName(n_tag) != NULL))
02312     CSException::throwException(CS_CONTEXT, HA_ERR_FOUND_DUPP_KEY, "Meta data tag already exists.");
02313     
02314 #ifdef HAVE_ALIAS_SUPPORT
02315   alias = md.findAlias();
02316 
02317   if (alias == value) {
02318     reset_alias = true;
02319     alias = NULL; // The alias is being deleted.
02320   }
02321 #endif
02322   
02323   if (!n_tag)
02324     n_tag = o_tag;
02325     
02326   // Create the meta data record.
02327   data = md.getBuffer();
02328   location = md.findNamePosition(o_tag);
02329   record_size = MetaData::recSize(location);
02330   iMetDataSize -= record_size;
02331   memmove(location, location + record_size, iMetDataSize - (location - data)); // Shift the meta data down
02332   
02333   // Add the updated meta data to the end of the block.
02334   iMetData->setLength(iMetDataSize + n_meta_name.length() + n_meta_value.length()  + 2);
02335   
02336   md.use_data(iMetData->getBuffer(0), iMetDataSize); // Reset this incase the buffer moved.
02337 #ifdef HAVE_ALIAS_SUPPORT
02338   // Get the alias again incase it moved.
02339   if (alias)
02340     alias = md.findAlias();
02341 #endif
02342   
02343   data = iMetData->getBuffer(0);
02344     
02345 #ifdef HAVE_ALIAS_SUPPORT
02346   if ((!alias) && !my_strcasecmp(&UTF8_CHARSET, MS_ALIAS_TAG, n_tag)) {
02347     reset_alias = true;
02348     memcpy(data + iMetDataSize, MS_ALIAS_TAG, n_meta_name.length()); // Use my alias tag so we do not need to wory about case.
02349     alias = data + iMetDataSize + n_meta_name.length() + 1; // Set the alias to the value location.
02350   } else 
02351 #endif
02352     memcpy(data + iMetDataSize, n_meta_name.c_ptr_quick(), n_meta_name.length());
02353     
02354   iMetDataSize += n_meta_name.length();
02355   data[iMetDataSize] = 0;
02356   iMetDataSize++;
02357 
02358   memcpy(data + iMetDataSize, n_meta_value.c_ptr_quick(), n_meta_value.length());
02359   iMetDataSize += n_meta_value.length();
02360   data[iMetDataSize] = 0;
02361   iMetDataSize++;
02362   
02363   
02364   // Update the blob header with the new meta data.
02365   MSOpenTable *otab = MSOpenTable::newOpenTable(NULL);
02366   push_(otab);
02367   iRepoFile->setBlobMetaData(otab, n_repo_offset, data, iMetDataSize, reset_alias, alias);
02368   release_(otab);
02369     
02370   exit_();
02371 }
02372 
02373 /*
02374  * -------------------------------------------------------------------------
02375  * SYSTEM TABLE SHARES
02376  */
02377 
02378 CSSyncSortedList *MSSystemTableShare::gSystemTableList;
02379 
02380 MSSystemTableShare::MSSystemTableShare():
02381 CSRefObject(),
02382 myTablePath(NULL),
02383 mySysDatabase(NULL),
02384 iOpenCount(0)
02385 {
02386   thr_lock_init(&myThrLock);
02387 }
02388 
02389 MSSystemTableShare::~MSSystemTableShare()
02390 {
02391 #ifdef DRIZZLED
02392   myThrLock.deinit();
02393 #else
02394   thr_lock_delete(&myThrLock);
02395 #endif
02396   if (myTablePath) {
02397     myTablePath->release();
02398     myTablePath = NULL;
02399   }
02400   if (mySysDatabase) {
02401     mySysDatabase->release();
02402     mySysDatabase = NULL;
02403   }
02404 }
02405 
02406 CSObject *MSSystemTableShare::getKey()
02407 {
02408   return (CSObject *) myTablePath;
02409 }
02410 
02411 int MSSystemTableShare::compareKey(CSObject *key)
02412 {
02413   return myTablePath->compare((CSString *) key);
02414 }
02415 
02416 void MSSystemTableShare::startUp()
02417 {
02418   new_(gSystemTableList, CSSyncSortedList);
02419 }
02420 
02421 void MSSystemTableShare::shutDown()
02422 {
02423   if (gSystemTableList) {
02424     gSystemTableList->release();
02425     gSystemTableList = NULL;
02426   }
02427 }
02428 
02429 MSOpenSystemTable *MSSystemTableShare::openSystemTable(const char *table_path, TABLE *table)
02430 {
02431   CSString      *table_url;
02432   MSSystemTableShare  *share;
02433   MSOpenSystemTable *otab = NULL;
02434   SysTableType    table_type;
02435 
02436   enter_();
02437   
02438   table_type =  pbms_systable_type(cs_last_name_of_path(table_path));
02439   if (table_type == SYS_UNKNOWN)
02440     CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, "Table not found");
02441   
02442   table_path = cs_last_name_of_path(table_path, 2);
02443   table_url = CSString::newString(table_path);
02444   push_(table_url);
02445 
02446   lock_(gSystemTableList);
02447   if (!(share = (MSSystemTableShare *) gSystemTableList->find(table_url))) {
02448     share = MSSystemTableShare::newTableShare(RETAIN(table_url));
02449     gSystemTableList->add(share);
02450   }
02451   
02452   switch (table_type) {
02453     case SYS_REP:
02454       new_(otab, MSRepositoryTable(share, table));
02455       break;
02456     case SYS_REF:
02457       new_(otab, MSReferenceTable(share, table));
02458       break;
02459     case SYS_BLOB:
02460       new_(otab, MSBlobDataTable(share, table));
02461       break;
02462     case SYS_DUMP:
02463       new_(otab, MSDumpTable(share, table));
02464       break;
02465     case SYS_META:
02466       new_(otab, MSMetaDataTable(share, table));
02467       break;
02468     case SYS_HTTP:
02469       new_(otab, MSHTTPHeaderTable(share, table));
02470       break;
02471 #ifdef HAVE_ALIAS_SUPPORT
02472     case SYS_ALIAS:
02473       new_(otab, MSBlobAliasTable(share, table));
02474       break;
02475 #endif
02476     case SYS_VARIABLE:
02477       new_(otab, MSVariableTable(share, table));
02478       break;
02479     case SYS_CLOUD:
02480       new_(otab, MSCloudTable(share, table));
02481       break;
02482     case SYS_BACKUP:
02483       new_(otab, MSBackupTable(share, table));
02484       break;
02485 #ifndef DRIZZLED
02486     case SYS_ENABLED:
02487       new_(otab, MSEnabledTable(share, table));
02488       break;
02489 #endif
02490     case SYS_UNKNOWN:
02491       break;
02492   }
02493   
02494   share->iOpenCount++;
02495   unlock_(gSystemTableList);
02496 
02497   release_(table_url);
02498   return_(otab);
02499 }
02500 
02501 void MSSystemTableShare::removeDatabaseSystemTables(MSDatabase *doomed_db)
02502 {
02503   MSSystemTableShare  *share;
02504   uint32_t i= 0;
02505   enter_();
02506   
02507   push_(doomed_db);
02508   lock_(gSystemTableList);
02509   while ((share = (MSSystemTableShare *) gSystemTableList->itemAt(i))) {
02510     if (share->mySysDatabase == doomed_db) {
02511       gSystemTableList->remove(share->myTablePath);
02512     } else
02513       i++;
02514   }
02515   
02516   unlock_(gSystemTableList);
02517   release_(doomed_db);
02518   exit_();
02519 }
02520 
02521 void MSSystemTableShare::releaseSystemTable(MSOpenSystemTable *tab)
02522 {
02523   enter_();
02524   lock_(gSystemTableList);
02525   tab->myShare->iOpenCount--;
02526   if (!tab->myShare->iOpenCount) {
02527     gSystemTableList->remove(tab->myShare->myTablePath);
02528   }
02529   unlock_(gSystemTableList);
02530   exit_();
02531 }
02532 
02533 MSSystemTableShare *MSSystemTableShare::newTableShare(CSString *table_path)
02534 {
02535   MSSystemTableShare *tab;
02536 
02537   enter_();
02538   if (!(tab = new MSSystemTableShare())) {
02539     table_path->release();
02540     CSException::throwOSError(CS_CONTEXT, ENOMEM);
02541   }
02542   push_(tab);
02543   tab->myTablePath = table_path;
02544   tab->mySysDatabase = MSDatabase::getDatabase(table_path->left("/", -1), true);
02545   pop_(tab);
02546   return_(tab);
02547 }
02548 
02549 void PBMSSystemTables::systemTablesStartUp()
02550 {
02551   MSCloudTable::startUp();
02552   MSBackupTable::startUp();
02553 }
02554 
02555 void PBMSSystemTables::systemTableShutDown()
02556 {
02557   MSBackupTable::shutDown();
02558   MSCloudTable::shutDown();
02559 }
02560