Drizzled Public API Documentation

haildb_engine.cc
00001 /*
00002   Copyright (C) 2010 Stewart Smith
00003 
00004   This program is free software; you can redistribute it and/or
00005   modify it under the terms of the GNU General Public License
00006   as published by the Free Software Foundation; either version 2
00007   of the License, or (at your option) any later version.
00008 
00009   This program is distributed in the hope that it will be useful,
00010   but WITHOUT ANY WARRANTY; without even the implied warranty of
00011   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012   GNU General Public License for more details.
00013 
00014   You should have received a copy of the GNU General Public License
00015   along with this program; if not, write to the Free Software
00016   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
00017 */
00018 
00019 /* innobase_get_int_col_max_value() comes from ha_innodb.cc which is under
00020    the following license and Copyright */
00021 
00022 /*****************************************************************************
00023 
00024 Copyright (C) 2000, 2009, MySQL AB & Innobase Oy. All Rights Reserved.
00025 Copyright (C) 2008, 2009 Google Inc.
00026 
00027 Portions of this file contain modifications contributed and copyrighted by
00028 Google, Inc. Those modifications are gratefully acknowledged and are described
00029 briefly in the InnoDB documentation. The contributions by Google are
00030 incorporated with their permission, and subject to the conditions contained in
00031 the file COPYING.Google.
00032 
00033 This program is free software; you can redistribute it and/or modify it under
00034 the terms of the GNU General Public License as published by the Free Software
00035 Foundation; version 2 of the License.
00036 
00037 This program is distributed in the hope that it will be useful, but WITHOUT
00038 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00039 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
00040 
00041 You should have received a copy of the GNU General Public License along with
00042 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
00043 St, Fifth Floor, Boston, MA 02110-1301 USA
00044 
00045 *****************************************************************************/
00046 /***********************************************************************
00047 
00048 Copyright (C) 1995, 2009, Innobase Oy. All Rights Reserved.
00049 Copyright (C) 2009, Percona Inc.
00050 
00051 Portions of this file contain modifications contributed and copyrighted
00052 by Percona Inc.. Those modifications are
00053 gratefully acknowledged and are described briefly in the InnoDB
00054 documentation. The contributions by Percona Inc. are incorporated with
00055 their permission, and subject to the conditions contained in the file
00056 COPYING.Percona.
00057 
00058 This program is free software; you can redistribute it and/or modify it
00059 under the terms of the GNU General Public License as published by the
00060 Free Software Foundation; version 2 of the License.
00061 
00062 This program is distributed in the hope that it will be useful, but
00063 WITHOUT ANY WARRANTY; without even the implied warranty of
00064 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
00065 Public License for more details.
00066 
00067 You should have received a copy of the GNU General Public License along
00068 with this program; if not, write to the Free Software Foundation, Inc.,
00069 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00070 
00071 ***********************************************************************/
00072 
00073 
00074 #include <config.h>
00075 #include <drizzled/table.h>
00076 #include <drizzled/error.h>
00077 #include <drizzled/internal/my_pthread.h>
00078 #include <drizzled/plugin/transactional_storage_engine.h>
00079 #include <drizzled/plugin/error_message.h>
00080 
00081 #include <fcntl.h>
00082 #include <stdarg.h>
00083 
00084 #include <string>
00085 #include <boost/algorithm/string.hpp>
00086 #include <boost/unordered_set.hpp>
00087 #include <boost/foreach.hpp>
00088 #include <map>
00089 #include <fstream>
00090 #include <drizzled/message/table.pb.h>
00091 #include <drizzled/internal/m_string.h>
00092 
00093 #include <drizzled/global_charset_info.h>
00094 
00095 #include "haildb_datadict_dump_func.h"
00096 #include "config_table_function.h"
00097 #include "status_table_function.h"
00098 
00099 #include <haildb.h>
00100 
00101 #include "haildb_engine.h"
00102 
00103 #include <drizzled/field.h>
00104 #include <drizzled/field/blob.h>
00105 #include <drizzled/field/enum.h>
00106 #include <drizzled/session.h>
00107 #include <drizzled/module/option_map.h>
00108 #include <drizzled/charset.h>
00109 #include <drizzled/current_session.h>
00110 #include <drizzled/key.h>
00111 #include <drizzled/sql_lex.h>
00112 
00113 #include <iostream>
00114 
00115 namespace po= boost::program_options;
00116 #include <boost/program_options.hpp>
00117 #include <boost/algorithm/string.hpp>
00118 
00119 using namespace std;
00120 using namespace google;
00121 using namespace drizzled;
00122 
00123 int read_row_from_haildb(Session *session, unsigned char* buf, ib_crsr_t cursor, ib_tpl_t tuple, Table* table, bool has_hidden_primary_key, uint64_t *hidden_pkey, drizzled::memory::Root **blobroot= NULL);
00124 static void fill_ib_search_tpl_from_drizzle_key(ib_tpl_t search_tuple,
00125                                                 const drizzled::KeyInfo *key_info,
00126                                                 const unsigned char *key_ptr,
00127                                                 uint32_t key_len);
00128 static void store_key_value_from_haildb(KeyInfo *key_info, unsigned char* ref, int ref_len, const unsigned char *record);
00129 
00130 #define HAILDB_EXT ".EID"
00131 
00132 const char HAILDB_TABLE_DEFINITIONS_TABLE[]= "data_dictionary/haildb_table_definitions";
00133 const string statement_savepoint_name("STATEMENT");
00134 
00135 static boost::unordered_set<std::string> haildb_system_table_names;
00136 
00137 
00138 static const char *HailDBCursor_exts[] = {
00139   NULL
00140 };
00141 
00142 class HailDBEngine : public drizzled::plugin::TransactionalStorageEngine
00143 {
00144 public:
00145   HailDBEngine(const string &name_arg)
00146    : drizzled::plugin::TransactionalStorageEngine(name_arg,
00147                                                   HTON_NULL_IN_KEY |
00148                                                   HTON_CAN_INDEX_BLOBS |
00149                                                   HTON_AUTO_PART_KEY |
00150                                                   HTON_PARTIAL_COLUMN_READ |
00151                                                   HTON_HAS_DOES_TRANSACTIONS)
00152   {
00153     table_definition_ext= HAILDB_EXT;
00154   }
00155 
00156   ~HailDBEngine();
00157 
00158   virtual Cursor *create(Table &table)
00159   {
00160     return new HailDBCursor(*this, table);
00161   }
00162 
00163   const char **bas_ext() const {
00164     return HailDBCursor_exts;
00165   }
00166 
00167   bool validateCreateTableOption(const std::string &key,
00168                                  const std::string &state);
00169 
00170   int doCreateTable(Session&,
00171                     Table& table_arg,
00172                     const drizzled::identifier::Table &identifier,
00173                     const drizzled::message::Table& proto);
00174 
00175   int doDropTable(Session&, const identifier::Table &identifier);
00176 
00177   int doRenameTable(drizzled::Session&,
00178                     const drizzled::identifier::Table&,
00179                     const drizzled::identifier::Table&);
00180 
00181   int doGetTableDefinition(Session& session,
00182                            const identifier::Table &identifier,
00183                            drizzled::message::Table &table_proto);
00184 
00185   bool doDoesTableExist(Session&, const identifier::Table &identifier);
00186 
00187 private:
00188   void getTableNamesInSchemaFromHailDB(const drizzled::identifier::Schema &schema,
00189                                        drizzled::plugin::TableNameList *set_of_names,
00190                                        drizzled::identifier::Table::vector *identifiers);
00191 
00192 public:
00193   void doGetTableIdentifiers(drizzled::CachedDirectory &,
00194                              const drizzled::identifier::Schema &schema,
00195                              drizzled::identifier::Table::vector &identifiers);
00196 
00197   /* The following defines can be increased if necessary */
00198   uint32_t max_supported_keys()          const { return 1000; }
00199   uint32_t max_supported_key_length()    const { return 3500; }
00200   uint32_t max_supported_key_part_length() const { return 767; }
00201 
00202   uint32_t index_flags(enum  ha_key_alg) const
00203   {
00204     return (HA_READ_NEXT |
00205             HA_READ_PREV |
00206             HA_READ_RANGE |
00207             HA_READ_ORDER |
00208             HA_KEYREAD_ONLY);
00209   }
00210   virtual int doStartTransaction(Session *session,
00211                                  start_transaction_option_t options);
00212   virtual void doStartStatement(Session *session);
00213   virtual void doEndStatement(Session *session);
00214 
00215   virtual int doSetSavepoint(Session* session,
00216                                  drizzled::NamedSavepoint &savepoint);
00217   virtual int doRollbackToSavepoint(Session* session,
00218                                      drizzled::NamedSavepoint &savepoint);
00219   virtual int doReleaseSavepoint(Session* session,
00220                                      drizzled::NamedSavepoint &savepoint);
00221   virtual int doCommit(Session* session, bool all);
00222   virtual int doRollback(Session* session, bool all);
00223 
00224   typedef std::map<std::string, HailDBTableShare*> HailDBMap;
00225   HailDBMap haildb_open_tables;
00226   HailDBTableShare *findOpenTable(const std::string table_name);
00227   void addOpenTable(const std::string &table_name, HailDBTableShare *);
00228   void deleteOpenTable(const std::string &table_name);
00229 
00230   uint64_t getInitialAutoIncrementValue(HailDBCursor *cursor);
00231   uint64_t getHiddenPrimaryKeyInitialAutoIncrementValue(HailDBCursor *cursor);
00232 
00233 };
00234 
00235 static drizzled::plugin::StorageEngine *haildb_engine= NULL;
00236 
00237 
00238 static ib_trx_t* get_trx(Session* session)
00239 {
00240   return (ib_trx_t*) session->getEngineData(haildb_engine);
00241 }
00242 
00243 /* This is a superset of the map from innobase plugin.
00244    Unlike innobase plugin we don't act on errors here, we just
00245    map error codes. */
00246 static int ib_err_t_to_drizzle_error(Session* session, ib_err_t err)
00247 {
00248   switch (err)
00249   {
00250   case DB_SUCCESS:
00251     return 0;
00252 
00253   case DB_ERROR:
00254   default:
00255     return -1;
00256 
00257   case DB_INTERRUPTED:
00258     return ER_QUERY_INTERRUPTED; // FIXME: is this correct?
00259 
00260   case DB_OUT_OF_MEMORY:
00261     return HA_ERR_OUT_OF_MEM;
00262 
00263   case DB_DUPLICATE_KEY:
00264     return HA_ERR_FOUND_DUPP_KEY;
00265 
00266   case DB_FOREIGN_DUPLICATE_KEY:
00267     return HA_ERR_FOREIGN_DUPLICATE_KEY;
00268 
00269   case DB_MISSING_HISTORY:
00270     return HA_ERR_TABLE_DEF_CHANGED;
00271 
00272   case DB_RECORD_NOT_FOUND:
00273     return HA_ERR_NO_ACTIVE_RECORD;
00274 
00275   case DB_DEADLOCK:
00276     /* HailDB will roll back a transaction itself due to DB_DEADLOCK.
00277        This means we have to tell Drizzle about it */
00278     session->markTransactionForRollback(true);
00279     return HA_ERR_LOCK_DEADLOCK;
00280 
00281   case DB_LOCK_WAIT_TIMEOUT:
00282     session->markTransactionForRollback(false);
00283     return HA_ERR_LOCK_WAIT_TIMEOUT;
00284 
00285   case DB_NO_REFERENCED_ROW:
00286     return HA_ERR_NO_REFERENCED_ROW;
00287 
00288   case DB_ROW_IS_REFERENCED:
00289     return HA_ERR_ROW_IS_REFERENCED;
00290 
00291   case DB_CANNOT_ADD_CONSTRAINT:
00292     return HA_ERR_CANNOT_ADD_FOREIGN;
00293 
00294   case DB_CANNOT_DROP_CONSTRAINT:
00295     return HA_ERR_ROW_IS_REFERENCED; /* misleading. should have new err code */
00296 
00297   case DB_COL_APPEARS_TWICE_IN_INDEX:
00298   case DB_CORRUPTION:
00299     return HA_ERR_CRASHED;
00300 
00301   case DB_MUST_GET_MORE_FILE_SPACE:
00302   case DB_OUT_OF_FILE_SPACE:
00303     return HA_ERR_RECORD_FILE_FULL;
00304 
00305   case DB_TABLE_IS_BEING_USED:
00306     return HA_ERR_WRONG_COMMAND;
00307 
00308   case DB_TABLE_NOT_FOUND:
00309     return HA_ERR_NO_SUCH_TABLE;
00310 
00311   case DB_TOO_BIG_RECORD:
00312     return HA_ERR_TO_BIG_ROW;
00313 
00314   case DB_NO_SAVEPOINT:
00315     return HA_ERR_NO_SAVEPOINT;
00316 
00317   case DB_LOCK_TABLE_FULL:
00318     return HA_ERR_LOCK_TABLE_FULL;
00319 
00320   case DB_PRIMARY_KEY_IS_NULL:
00321     return ER_PRIMARY_CANT_HAVE_NULL;
00322 
00323   case DB_TOO_MANY_CONCURRENT_TRXS:
00324     return HA_ERR_RECORD_FILE_FULL; /* need better error code */
00325 
00326   case DB_END_OF_INDEX:
00327     return HA_ERR_END_OF_FILE;
00328 
00329   case DB_UNSUPPORTED:
00330     return HA_ERR_UNSUPPORTED;
00331   }
00332 }
00333 
00334 static ib_trx_level_t tx_isolation_to_ib_trx_level(enum_tx_isolation level)
00335 {
00336   switch(level)
00337   {
00338   case ISO_REPEATABLE_READ:
00339     return IB_TRX_REPEATABLE_READ;
00340   case ISO_READ_COMMITTED:
00341     return IB_TRX_READ_COMMITTED;
00342   case ISO_SERIALIZABLE:
00343     return IB_TRX_SERIALIZABLE;
00344   case ISO_READ_UNCOMMITTED:
00345     return IB_TRX_READ_UNCOMMITTED;
00346   }
00347 
00348   assert(0);
00349   return IB_TRX_REPEATABLE_READ;
00350 }
00351 
00352 int HailDBEngine::doStartTransaction(Session *session,
00353                                              start_transaction_option_t options)
00354 {
00355   ib_trx_t *transaction;
00356   ib_trx_level_t isolation_level;
00357 
00358   (void)options;
00359 
00360   transaction= get_trx(session);
00361   isolation_level= tx_isolation_to_ib_trx_level(session->getTxIsolation());
00362   *transaction= ib_trx_begin(isolation_level);
00363 
00364   return *transaction == NULL;
00365 }
00366 
00367 void HailDBEngine::doStartStatement(Session *session)
00368 {
00369   if(*get_trx(session) == NULL)
00370     doStartTransaction(session, START_TRANS_NO_OPTIONS);
00371 
00372   ib_savepoint_take(*get_trx(session), statement_savepoint_name.c_str(),
00373                     statement_savepoint_name.length());
00374 }
00375 
00376 void HailDBEngine::doEndStatement(Session *)
00377 {
00378 }
00379 
00380 int HailDBEngine::doSetSavepoint(Session* session,
00381                                          drizzled::NamedSavepoint &savepoint)
00382 {
00383   ib_trx_t *transaction= get_trx(session);
00384   ib_savepoint_take(*transaction, savepoint.getName().c_str(),
00385                     savepoint.getName().length());
00386   return 0;
00387 }
00388 
00389 int HailDBEngine::doRollbackToSavepoint(Session* session,
00390                                                 drizzled::NamedSavepoint &savepoint)
00391 {
00392   ib_trx_t *transaction= get_trx(session);
00393   ib_err_t err;
00394 
00395   err= ib_savepoint_rollback(*transaction, savepoint.getName().c_str(),
00396                              savepoint.getName().length());
00397 
00398   return ib_err_t_to_drizzle_error(session, err);
00399 }
00400 
00401 int HailDBEngine::doReleaseSavepoint(Session* session,
00402                                              drizzled::NamedSavepoint &savepoint)
00403 {
00404   ib_trx_t *transaction= get_trx(session);
00405   ib_err_t err;
00406 
00407   err= ib_savepoint_release(*transaction, savepoint.getName().c_str(),
00408                             savepoint.getName().length());
00409   if (err != DB_SUCCESS)
00410     return ib_err_t_to_drizzle_error(session, err);
00411 
00412   return 0;
00413 }
00414 
00415 int HailDBEngine::doCommit(Session* session, bool all)
00416 {
00417   ib_err_t err;
00418   ib_trx_t *transaction= get_trx(session);
00419 
00420   if (all)
00421   {
00422     err= ib_trx_commit(*transaction);
00423 
00424     if (err != DB_SUCCESS)
00425       return ib_err_t_to_drizzle_error(session, err);
00426 
00427     *transaction= NULL;
00428   }
00429 
00430   return 0;
00431 }
00432 
00433 int HailDBEngine::doRollback(Session* session, bool all)
00434 {
00435   ib_err_t err;
00436   ib_trx_t *transaction= get_trx(session);
00437 
00438   if (all)
00439   {
00440     if (ib_trx_state(*transaction) == IB_TRX_NOT_STARTED)
00441       err= ib_trx_release(*transaction);
00442     else
00443       err= ib_trx_rollback(*transaction);
00444 
00445     if (err != DB_SUCCESS)
00446       return ib_err_t_to_drizzle_error(session, err);
00447 
00448     *transaction= NULL;
00449   }
00450   else
00451   {
00452     if (ib_trx_state(*transaction) == IB_TRX_NOT_STARTED)
00453       return 0;
00454 
00455     err= ib_savepoint_rollback(*transaction, statement_savepoint_name.c_str(),
00456                                statement_savepoint_name.length());
00457     if (err != DB_SUCCESS)
00458       return ib_err_t_to_drizzle_error(session, err);
00459   }
00460 
00461   return 0;
00462 }
00463 
00464 HailDBTableShare *HailDBEngine::findOpenTable(const string table_name)
00465 {
00466   HailDBMap::iterator find_iter=
00467     haildb_open_tables.find(table_name);
00468 
00469   if (find_iter != haildb_open_tables.end())
00470     return (*find_iter).second;
00471   else
00472     return NULL;
00473 }
00474 
00475 void HailDBEngine::addOpenTable(const string &table_name, HailDBTableShare *share)
00476 {
00477   haildb_open_tables[table_name]= share;
00478 }
00479 
00480 void HailDBEngine::deleteOpenTable(const string &table_name)
00481 {
00482   haildb_open_tables.erase(table_name);
00483 }
00484 
00485 static pthread_mutex_t haildb_mutex= PTHREAD_MUTEX_INITIALIZER;
00486 
00487 uint64_t HailDBCursor::getHiddenPrimaryKeyInitialAutoIncrementValue()
00488 {
00489   uint64_t nr;
00490   ib_err_t err;
00491   ib_trx_t transaction= *get_trx(getTable()->in_use);
00492   ib_cursor_attach_trx(cursor, transaction);
00493   tuple= ib_clust_read_tuple_create(cursor);
00494   err= ib_cursor_last(cursor);
00495   assert(err == DB_SUCCESS || err == DB_END_OF_INDEX); // Probably a FIXME
00496   err= ib_cursor_read_row(cursor, tuple);
00497   if (err == DB_RECORD_NOT_FOUND)
00498     nr= 1;
00499   else
00500   {
00501     assert (err == DB_SUCCESS);
00502     err= ib_tuple_read_u64(tuple, getTable()->getShare()->sizeFields(), &nr);
00503     nr++;
00504   }
00505   ib_tuple_delete(tuple);
00506   tuple= NULL;
00507   err= ib_cursor_reset(cursor);
00508   assert(err == DB_SUCCESS);
00509   return nr;
00510 }
00511 
00512 uint64_t HailDBCursor::getInitialAutoIncrementValue()
00513 {
00514   uint64_t nr;
00515   int error;
00516 
00517   (void) extra(HA_EXTRA_KEYREAD);
00518   getTable()->mark_columns_used_by_index_no_reset(getTable()->getShare()->next_number_index);
00519   doStartIndexScan(getTable()->getShare()->next_number_index, 1);
00520   if (getTable()->getShare()->next_number_keypart == 0)
00521   {           // Autoincrement at key-start
00522     error=index_last(getTable()->getUpdateRecord());
00523   }
00524   else
00525   {
00526     unsigned char key[MAX_KEY_LENGTH];
00527     key_copy(key, getTable()->getInsertRecord(),
00528              getTable()->key_info + getTable()->getShare()->next_number_index,
00529              getTable()->getShare()->next_number_key_offset);
00530     error= index_read_map(getTable()->getUpdateRecord(), key,
00531                           make_prev_keypart_map(getTable()->getShare()->next_number_keypart),
00532                           HA_READ_PREFIX_LAST);
00533   }
00534 
00535   if (error)
00536     nr=1;
00537   else
00538     nr= ((uint64_t) getTable()->found_next_number_field->
00539          val_int_offset(getTable()->getShare()->rec_buff_length)+1);
00540   doEndIndexScan();
00541   (void) extra(HA_EXTRA_NO_KEYREAD);
00542 
00543   if (getTable()->getShare()->getTableMessage()->options().auto_increment_value() > nr)
00544     nr= getTable()->getShare()->getTableMessage()->options().auto_increment_value();
00545 
00546   return nr;
00547 }
00548 
00549 HailDBTableShare::HailDBTableShare(const char* name, bool hidden_primary_key)
00550   : use_count(0), has_hidden_primary_key(hidden_primary_key)
00551 {
00552   table_name.assign(name);
00553 }
00554 
00555 uint64_t HailDBEngine::getInitialAutoIncrementValue(HailDBCursor *cursor)
00556 {
00557   doStartTransaction(current_session, START_TRANS_NO_OPTIONS);
00558   uint64_t initial_auto_increment_value= cursor->getInitialAutoIncrementValue();
00559   doCommit(current_session, true);
00560 
00561   return initial_auto_increment_value;
00562 }
00563 
00564 uint64_t HailDBEngine::getHiddenPrimaryKeyInitialAutoIncrementValue(HailDBCursor *cursor)
00565 {
00566   doStartTransaction(current_session, START_TRANS_NO_OPTIONS);
00567   uint64_t initial_auto_increment_value= cursor->getHiddenPrimaryKeyInitialAutoIncrementValue();
00568   doCommit(current_session, true);
00569 
00570   return initial_auto_increment_value;
00571 }
00572 
00573 HailDBTableShare *HailDBCursor::get_share(const char *table_name, bool has_hidden_primary_key, int *rc)
00574 {
00575   pthread_mutex_lock(&haildb_mutex);
00576 
00577   HailDBEngine *a_engine= static_cast<HailDBEngine *>(getEngine());
00578   share= a_engine->findOpenTable(table_name);
00579 
00580   if (!share)
00581   {
00582     share= new HailDBTableShare(table_name, has_hidden_primary_key);
00583 
00584     if (share == NULL)
00585     {
00586       pthread_mutex_unlock(&haildb_mutex);
00587       *rc= HA_ERR_OUT_OF_MEM;
00588       return(NULL);
00589     }
00590 
00591     if (getTable()->found_next_number_field)
00592     {
00593       share->auto_increment_value.fetch_and_store(
00594                                   a_engine->getInitialAutoIncrementValue(this));
00595 
00596     }
00597 
00598     if (has_hidden_primary_key)
00599     {
00600       uint64_t hidden_pkey= 0;
00601       hidden_pkey= a_engine->getHiddenPrimaryKeyInitialAutoIncrementValue(this);
00602       share->hidden_pkey_auto_increment_value.fetch_and_store(hidden_pkey);
00603     }
00604 
00605     a_engine->addOpenTable(share->table_name, share);
00606     thr_lock_init(&share->lock);
00607   }
00608   share->use_count++;
00609 
00610   pthread_mutex_unlock(&haildb_mutex);
00611 
00612   return(share);
00613 }
00614 
00615 int HailDBCursor::free_share()
00616 {
00617   pthread_mutex_lock(&haildb_mutex);
00618   if (!--share->use_count)
00619   {
00620     HailDBEngine *a_engine= static_cast<HailDBEngine *>(getEngine());
00621     a_engine->deleteOpenTable(share->table_name);
00622     delete share;
00623   }
00624   pthread_mutex_unlock(&haildb_mutex);
00625 
00626   return 0;
00627 }
00628 
00629 
00630 THR_LOCK_DATA **HailDBCursor::store_lock(Session *session,
00631                                                  THR_LOCK_DATA **to,
00632                                                  thr_lock_type lock_type)
00633 {
00634   /* Currently, we can get a transaction start by ::store_lock
00635      instead of beginTransaction, startStatement.
00636 
00637      See https://bugs.launchpad.net/drizzle/+bug/535528
00638 
00639      all stemming from the transactional engine interface needing
00640      a severe amount of immodium.
00641    */
00642 
00643   if(*get_trx(session) == NULL)
00644   {
00645     static_cast<HailDBEngine*>(getEngine())->
00646                     doStartTransaction(session, START_TRANS_NO_OPTIONS);
00647   }
00648 
00649   if (lock_type != TL_UNLOCK)
00650   {
00651     ib_savepoint_take(*get_trx(session), statement_savepoint_name.c_str(),
00652                       statement_savepoint_name.length());
00653   }
00654 
00655   /* the below is adapted from ha_innodb.cc */
00656 
00657   const uint32_t sql_command = session->getSqlCommand();
00658 
00659   if (sql_command == SQLCOM_DROP_TABLE) {
00660 
00661     /* MySQL calls this function in DROP Table though this table
00662     handle may belong to another session that is running a query.
00663     Let us in that case skip any changes to the prebuilt struct. */ 
00664 
00665   } else if (lock_type == TL_READ_WITH_SHARED_LOCKS
00666        || lock_type == TL_READ_NO_INSERT
00667        || (lock_type != TL_IGNORE
00668            && sql_command != SQLCOM_SELECT)) {
00669 
00670     /* The OR cases above are in this order:
00671     1) MySQL is doing LOCK TABLES ... READ LOCAL, or we
00672     are processing a stored procedure or function, or
00673     2) (we do not know when TL_READ_HIGH_PRIORITY is used), or
00674     3) this is a SELECT ... IN SHARE MODE, or
00675     4) we are doing a complex SQL statement like
00676     INSERT INTO ... SELECT ... and the logical logging (MySQL
00677     binlog) requires the use of a locking read, or
00678     MySQL is doing LOCK TABLES ... READ.
00679     5) we let InnoDB do locking reads for all SQL statements that
00680     are not simple SELECTs; note that select_lock_type in this
00681     case may get strengthened in ::external_lock() to LOCK_X.
00682     Note that we MUST use a locking read in all data modifying
00683     SQL statements, because otherwise the execution would not be
00684     serializable, and also the results from the update could be
00685     unexpected if an obsolete consistent read view would be
00686     used. */
00687 
00688     enum_tx_isolation isolation_level= session->getTxIsolation();
00689 
00690     if (isolation_level != ISO_SERIALIZABLE
00691         && (lock_type == TL_READ || lock_type == TL_READ_NO_INSERT)
00692         && (sql_command == SQLCOM_INSERT_SELECT
00693       || sql_command == SQLCOM_UPDATE
00694       || sql_command == SQLCOM_CREATE_TABLE)) {
00695 
00696       /* If we either have innobase_locks_unsafe_for_binlog
00697       option set or this session is using READ COMMITTED
00698       isolation level and isolation level of the transaction
00699       is not set to serializable and MySQL is doing
00700       INSERT INTO...SELECT or UPDATE ... = (SELECT ...) or
00701       CREATE  ... SELECT... without FOR UPDATE or
00702       IN SHARE MODE in select, then we use consistent
00703       read for select. */
00704 
00705       ib_lock_mode= IB_LOCK_NONE;
00706     } else if (sql_command == SQLCOM_CHECKSUM) {
00707       /* Use consistent read for checksum table */
00708 
00709       ib_lock_mode= IB_LOCK_NONE;
00710     } else {
00711       ib_lock_mode= IB_LOCK_S;
00712     }
00713 
00714   } else if (lock_type != TL_IGNORE) {
00715 
00716     /* We set possible LOCK_X value in external_lock, not yet
00717     here even if this would be SELECT ... FOR UPDATE */
00718     ib_lock_mode= IB_LOCK_NONE;
00719   }
00720 
00721   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
00722 
00723     /* If we are not doing a LOCK TABLE, DISCARD/IMPORT
00724     TABLESPACE or TRUNCATE TABLE then allow multiple
00725     writers. Note that ALTER TABLE uses a TL_WRITE_ALLOW_READ
00726     < TL_WRITE_CONCURRENT_INSERT.
00727     */
00728 
00729     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT
00730          && lock_type <= TL_WRITE)
00731         && ! session->doing_tablespace_operation()
00732         && sql_command != SQLCOM_TRUNCATE
00733         && sql_command != SQLCOM_CREATE_TABLE) {
00734 
00735       lock_type = TL_WRITE_ALLOW_WRITE;
00736     }
00737 
00738     /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
00739     MySQL would use the lock TL_READ_NO_INSERT on t2, and that
00740     would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
00741     to t2. Convert the lock to a normal read lock to allow
00742     concurrent inserts to t2.
00743     */
00744 
00745     if (lock_type == TL_READ_NO_INSERT) {
00746 
00747       lock_type = TL_READ;
00748     }
00749 
00750     lock.type = lock_type;
00751   }
00752 
00753   *to++= &lock;
00754 
00755   return to;
00756 }
00757 
00758 void HailDBCursor::get_auto_increment(uint64_t, //offset,
00759                                               uint64_t, //increment,
00760                                               uint64_t, //nb_dis,
00761                                               uint64_t *first_value,
00762                                               uint64_t *nb_reserved_values)
00763 {
00764 fetch:
00765   *first_value= share->auto_increment_value.fetch_and_increment();
00766   if (*first_value == 0)
00767   {
00768     /* if it's zero, then we skip it... why? because of ass.
00769        set auto-inc to -1 and the sequence is:
00770        -1, 1.
00771        Zero is still "magic".
00772     */
00773     share->auto_increment_value.compare_and_swap(1, 0);
00774     goto fetch;
00775   }
00776   *nb_reserved_values= 1;
00777 }
00778 
00779 static const char* table_path_to_haildb_name(const char* name)
00780 {
00781   size_t l= strlen(name);
00782   static string datadict_path("data_dictionary/");
00783   static string sys_prefix("data_dictionary/haildb_");
00784   static string sys_table_prefix("HAILDB_");
00785 
00786   if (strncmp(name, sys_prefix.c_str(), sys_prefix.length()) == 0)
00787   {
00788     string find_name(name+datadict_path.length());
00789     std::transform(find_name.begin(), find_name.end(), find_name.begin(), ::toupper);
00790     boost::unordered_set<string>::iterator iter= haildb_system_table_names.find(find_name);
00791     if (iter != haildb_system_table_names.end())
00792       return iter->c_str()+sys_table_prefix.length();
00793   }
00794 
00795   int slashes= 2;
00796   while(slashes>0 && l > 0)
00797   {
00798     l--;
00799     if (name[l] == '/')
00800       slashes--;
00801   }
00802   if (slashes==0)
00803     l++;
00804 
00805   return &name[l];
00806 }
00807 
00808 static void TableIdentifier_to_haildb_name(const identifier::Table &identifier, std::string *str)
00809 {
00810   str->assign(table_path_to_haildb_name(identifier.getPath().c_str()));
00811 }
00812 
00813 HailDBCursor::HailDBCursor(drizzled::plugin::StorageEngine &engine_arg,
00814                            Table &table_arg)
00815   :Cursor(engine_arg, table_arg),
00816    ib_lock_mode(IB_LOCK_NONE),
00817    write_can_replace(false),
00818    blobroot(NULL)
00819 { }
00820 
00821 static unsigned int get_first_unique_index(drizzled::Table &table)
00822 {
00823   for (uint32_t k= 0; k < table.getShare()->keys; k++)
00824   {
00825     if (table.key_info[k].flags & HA_NOSAME)
00826     {
00827       return k;
00828     }
00829   }
00830 
00831   return 0;
00832 }
00833 
00834 int HailDBCursor::open(const char *name, int, uint32_t)
00835 {
00836   const char* haildb_table_name= table_path_to_haildb_name(name);
00837   ib_err_t err= ib_table_get_id(haildb_table_name, &table_id);
00838   bool has_hidden_primary_key= false;
00839   ib_id_t idx_id;
00840 
00841   if (err != DB_SUCCESS)
00842     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
00843 
00844   err= ib_cursor_open_table_using_id(table_id, NULL, &cursor);
00845   cursor_is_sec_index= false;
00846 
00847   if (err != DB_SUCCESS)
00848     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
00849 
00850   err= ib_index_get_id(haildb_table_name, "HIDDEN_PRIMARY", &idx_id);
00851 
00852   if (err == DB_SUCCESS)
00853     has_hidden_primary_key= true;
00854 
00855   int rc;
00856   share= get_share(name, has_hidden_primary_key, &rc);
00857   lock.init(&share->lock);
00858 
00859 
00860   if (getTable()->getShare()->getPrimaryKey() != MAX_KEY)
00861     ref_length= getTable()->key_info[getTable()->getShare()->getPrimaryKey()].key_length;
00862   else if (share->has_hidden_primary_key)
00863     ref_length= sizeof(uint64_t);
00864   else
00865   {
00866     unsigned int keynr= get_first_unique_index(*getTable());
00867     ref_length= getTable()->key_info[keynr].key_length;
00868   }
00869 
00870   in_table_scan= false;
00871 
00872   return(0);
00873 }
00874 
00875 int HailDBCursor::close(void)
00876 {
00877   ib_err_t err= ib_cursor_close(cursor);
00878   if (err != DB_SUCCESS)
00879     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
00880 
00881   free_share();
00882 
00883   delete blobroot;
00884   blobroot= NULL;
00885 
00886   return 0;
00887 }
00888 
00889 int HailDBCursor::external_lock(Session* session, int lock_type)
00890 {
00891   ib_cursor_stmt_begin(cursor);
00892 
00893   (void)session;
00894 
00895   if (lock_type == F_WRLCK)
00896   {
00897     /* SELECT ... FOR UPDATE or UPDATE TABLE */
00898     ib_lock_mode= IB_LOCK_X;
00899   }
00900   else
00901     ib_lock_mode= IB_LOCK_NONE;
00902 
00903   return 0;
00904 }
00905 
00906 static int create_table_add_field(ib_tbl_sch_t schema,
00907                                   const message::Table::Field &field,
00908                                   ib_err_t *err)
00909 {
00910   ib_col_attr_t column_attr= IB_COL_NONE;
00911 
00912   if (field.has_constraints() && field.constraints().is_notnull())
00913     column_attr= IB_COL_NOT_NULL;
00914 
00915   switch (field.type())
00916   {
00917   case message::Table::Field::VARCHAR:
00918     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_VARCHAR,
00919                                   column_attr, 0,
00920                                   field.string_options().length());
00921     break;
00922   case message::Table::Field::INTEGER:
00923     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_INT,
00924                                   column_attr, 0, 4);
00925     break;
00926   case message::Table::Field::BIGINT:
00927     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_INT,
00928                                   column_attr, 0, 8);
00929     break;
00930   case message::Table::Field::DOUBLE:
00931   case message::Table::Field::DATETIME:
00932     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_DOUBLE,
00933                                   column_attr, 0, sizeof(double));
00934     break;
00935   case message::Table::Field::ENUM:
00936     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_INT,
00937                                   column_attr, 0, 4);
00938     break;
00939   case message::Table::Field::DATE:
00940     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_INT,
00941                                   column_attr, 0, 4);
00942     break;
00943   case message::Table::Field::EPOCH:
00944     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_INT,
00945                                   column_attr, 0, 8);
00946     break;
00947   case message::Table::Field::BLOB:
00948     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_BLOB,
00949                                   column_attr, 0, 0);
00950     break;
00951   case message::Table::Field::DECIMAL:
00952     *err= ib_table_schema_add_col(schema, field.name().c_str(), IB_DECIMAL,
00953                                   column_attr, 0, 0);
00954     break;
00955   default:
00956     my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "Column Type");
00957     return(HA_ERR_UNSUPPORTED);
00958   }
00959 
00960   return 0;
00961 }
00962 
00963 static ib_err_t store_table_message(ib_trx_t transaction, const char* table_name, const drizzled::message::Table& table_message)
00964 {
00965   ib_crsr_t cursor;
00966   ib_tpl_t message_tuple;
00967   ib_err_t err;
00968   string serialized_message;
00969 
00970   err= ib_cursor_open_table(HAILDB_TABLE_DEFINITIONS_TABLE, transaction, &cursor);
00971   if (err != DB_SUCCESS)
00972     return err;
00973 
00974   message_tuple= ib_clust_read_tuple_create(cursor);
00975 
00976   err= ib_col_set_value(message_tuple, 0, table_name, strlen(table_name));
00977   if (err != DB_SUCCESS)
00978     goto cleanup;
00979 
00980   try {
00981     table_message.SerializeToString(&serialized_message);
00982   }
00983   catch (...)
00984   {
00985     goto cleanup;
00986   }
00987 
00988   err= ib_col_set_value(message_tuple, 1, serialized_message.c_str(),
00989                         serialized_message.length());
00990   if (err != DB_SUCCESS)
00991     goto cleanup;
00992 
00993   err= ib_cursor_insert_row(cursor, message_tuple);
00994 
00995 cleanup:
00996   ib_tuple_delete(message_tuple);
00997 
00998   ib_err_t cleanup_err= ib_cursor_close(cursor);
00999   if (err == DB_SUCCESS)
01000     err= cleanup_err;
01001 
01002   return err;
01003 }
01004 
01005 bool HailDBEngine::validateCreateTableOption(const std::string &key,
01006                                                      const std::string &state)
01007 {
01008   if (boost::iequals(key, "ROW_FORMAT"))
01009   {
01010     if (boost::iequals(state, "COMPRESSED"))
01011       return true;
01012 
01013     if (boost::iequals(state, "COMPACT"))
01014       return true;
01015 
01016     if (boost::iequals(state, "DYNAMIC"))
01017       return true;
01018 
01019     if (boost::iequals(state, "REDUNDANT"))
01020       return true;
01021   }
01022 
01023   return false;
01024 }
01025 
01026 static ib_tbl_fmt_t parse_ib_table_format(const std::string &value)
01027 {
01028   if (boost::iequals(value, "REDUNDANT"))
01029     return IB_TBL_REDUNDANT;
01030   else if (boost::iequals(value, "COMPACT"))
01031     return IB_TBL_COMPACT;
01032   else if (boost::iequals(value, "DYNAMIC"))
01033     return IB_TBL_DYNAMIC;
01034   else if (boost::iequals(value, "COMPRESSED"))
01035     return IB_TBL_COMPRESSED;
01036 
01037   assert(false); /* You need to add possible table formats here */
01038   return IB_TBL_COMPACT;
01039 }
01040 
01041 int HailDBEngine::doCreateTable(Session &session,
01042                                         Table& table_obj,
01043                                         const drizzled::identifier::Table &identifier,
01044                                         const drizzled::message::Table& table_message)
01045 {
01046   ib_tbl_sch_t haildb_table_schema= NULL;
01047 //  ib_idx_sch_t haildb_pkey= NULL;
01048   ib_trx_t haildb_schema_transaction;
01049   ib_id_t haildb_table_id;
01050   ib_err_t haildb_err= DB_SUCCESS;
01051   string haildb_table_name;
01052   bool has_explicit_pkey= false;
01053 
01054   (void)table_obj;
01055 
01056   if (table_message.type() == message::Table::TEMPORARY)
01057   {
01058     ib_bool_t create_db_err= ib_database_create(GLOBAL_TEMPORARY_EXT);
01059     if (create_db_err != IB_TRUE)
01060       return -1;
01061   }
01062 
01063   TableIdentifier_to_haildb_name(identifier, &haildb_table_name);
01064 
01065   ib_tbl_fmt_t haildb_table_format= IB_TBL_COMPACT;
01066 
01067   const size_t num_engine_options= table_message.engine().options_size();
01068   for (size_t x= 0; x < num_engine_options; x++)
01069   {
01070     const message::Engine::Option &engine_option= table_message.engine().options(x);
01071     if (boost::iequals(engine_option.name(), "ROW_FORMAT"))
01072     {
01073       haildb_table_format= parse_ib_table_format(engine_option.state());
01074     }
01075   }
01076 
01077   haildb_err= ib_table_schema_create(haildb_table_name.c_str(),
01078                                      &haildb_table_schema,
01079                                      haildb_table_format, 0);
01080 
01081   if (haildb_err != DB_SUCCESS)
01082   {
01083     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01084                         ER_CANT_CREATE_TABLE,
01085                         _("Cannot create table %s. HailDB Error %d (%s)\n"),
01086                         haildb_table_name.c_str(), haildb_err, ib_strerror(haildb_err));
01087     return ib_err_t_to_drizzle_error(&session, haildb_err);
01088   }
01089 
01090   for (int colnr= 0; colnr < table_message.field_size() ; colnr++)
01091   {
01092     const message::Table::Field field = table_message.field(colnr);
01093 
01094     int field_err= create_table_add_field(haildb_table_schema, field,
01095                                           &haildb_err);
01096 
01097     if (haildb_err != DB_SUCCESS || field_err != 0)
01098       ib_table_schema_delete(haildb_table_schema); /* cleanup */
01099 
01100     if (haildb_err != DB_SUCCESS)
01101     {
01102       push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01103                           ER_CANT_CREATE_TABLE,
01104                           _("Cannot create field %s on table %s."
01105                             " HailDB Error %d (%s)\n"),
01106                           field.name().c_str(), haildb_table_name.c_str(),
01107                           haildb_err, ib_strerror(haildb_err));
01108       return ib_err_t_to_drizzle_error(&session, haildb_err);
01109     }
01110     if (field_err != 0)
01111       return field_err;
01112   }
01113 
01114   bool has_primary= false;
01115   for (int indexnr= 0; indexnr < table_message.indexes_size() ; indexnr++)
01116   {
01117     const message::Table::Index &index = table_message.indexes(indexnr);
01118 
01119     ib_idx_sch_t haildb_index;
01120 
01121     haildb_err= ib_table_schema_add_index(haildb_table_schema, index.name().c_str(),
01122             &haildb_index);
01123     if (haildb_err != DB_SUCCESS)
01124       goto schema_error;
01125 
01126     if (index.is_primary())
01127     {
01128       has_primary= true;
01129       haildb_err= ib_index_schema_set_clustered(haildb_index);
01130       has_explicit_pkey= true;
01131       if (haildb_err != DB_SUCCESS)
01132         goto schema_error;
01133     }
01134 
01135     if (index.is_unique())
01136     {
01137       haildb_err= ib_index_schema_set_unique(haildb_index);
01138       if (haildb_err != DB_SUCCESS)
01139         goto schema_error;
01140     }
01141 
01142     assert(index.type() == message::Table::Index::UNKNOWN_INDEX);
01143 
01144     for (int partnr= 0; partnr < index.index_part_size(); partnr++)
01145     {
01146       const message::Table::Index::IndexPart part= index.index_part(partnr);
01147       const message::Table::Field::FieldType part_type= table_message.field(part.fieldnr()).type();
01148       uint64_t compare_length= 0;
01149 
01150       if (part_type == message::Table::Field::BLOB
01151           || part_type == message::Table::Field::VARCHAR)
01152         compare_length= part.compare_length();
01153 
01154       haildb_err= ib_index_schema_add_col(haildb_index,
01155                             table_message.field(part.fieldnr()).name().c_str(),
01156                                           compare_length);
01157       if (haildb_err != DB_SUCCESS)
01158         goto schema_error;
01159     }
01160 
01161     if (! has_primary && index.is_unique())
01162     {
01163       haildb_err= ib_index_schema_set_clustered(haildb_index);
01164       has_explicit_pkey= true;
01165       if (haildb_err != DB_SUCCESS)
01166         goto schema_error;
01167     }
01168 
01169   }
01170 
01171   if (! has_explicit_pkey)
01172   {
01173     ib_idx_sch_t haildb_index;
01174 
01175     haildb_err= ib_table_schema_add_col(haildb_table_schema, "hidden_primary_key_col",
01176                                         IB_INT, IB_COL_NOT_NULL, 0, 8);
01177 
01178     haildb_err= ib_table_schema_add_index(haildb_table_schema, "HIDDEN_PRIMARY",
01179                                           &haildb_index);
01180     if (haildb_err != DB_SUCCESS)
01181       goto schema_error;
01182 
01183     haildb_err= ib_index_schema_set_clustered(haildb_index);
01184     if (haildb_err != DB_SUCCESS)
01185       goto schema_error;
01186 
01187     haildb_err= ib_index_schema_add_col(haildb_index, "hidden_primary_key_col", 0);
01188     if (haildb_err != DB_SUCCESS)
01189       goto schema_error;
01190   }
01191 
01192   haildb_schema_transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
01193   haildb_err= ib_schema_lock_exclusive(haildb_schema_transaction);
01194   if (haildb_err != DB_SUCCESS)
01195   {
01196     ib_err_t rollback_err= ib_trx_rollback(haildb_schema_transaction);
01197     ib_table_schema_delete(haildb_table_schema);
01198 
01199     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01200                         ER_CANT_CREATE_TABLE,
01201                         _("Cannot Lock HailDB Data Dictionary. HailDB Error %d (%s)\n"),
01202                         haildb_err, ib_strerror(haildb_err));
01203 
01204     assert (rollback_err == DB_SUCCESS);
01205 
01206     return HA_ERR_GENERIC;
01207   }
01208 
01209   haildb_err= ib_table_create(haildb_schema_transaction, haildb_table_schema,
01210                               &haildb_table_id);
01211 
01212   if (haildb_err != DB_SUCCESS)
01213   {
01214     ib_err_t rollback_err= ib_trx_rollback(haildb_schema_transaction);
01215     ib_table_schema_delete(haildb_table_schema);
01216 
01217     if (haildb_err == DB_TABLE_IS_BEING_USED)
01218       return EEXIST;
01219 
01220     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01221                         ER_CANT_CREATE_TABLE,
01222                         _("Cannot create table %s. HailDB Error %d (%s)\n"),
01223                         haildb_table_name.c_str(),
01224                         haildb_err, ib_strerror(haildb_err));
01225 
01226     assert (rollback_err == DB_SUCCESS);
01227     return HA_ERR_GENERIC;
01228   }
01229 
01230   if (table_message.type() == message::Table::TEMPORARY)
01231   {
01232     session.getMessageCache().storeTableMessage(identifier, table_message);
01233     haildb_err= DB_SUCCESS;
01234   }
01235   else
01236     haildb_err= store_table_message(haildb_schema_transaction,
01237                                     haildb_table_name.c_str(),
01238                                     table_message);
01239 
01240   if (haildb_err == DB_SUCCESS)
01241     haildb_err= ib_trx_commit(haildb_schema_transaction);
01242   else
01243     haildb_err= ib_trx_rollback(haildb_schema_transaction);
01244 
01245 schema_error:
01246   ib_table_schema_delete(haildb_table_schema);
01247 
01248   if (haildb_err != DB_SUCCESS)
01249   {
01250     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01251                         ER_CANT_CREATE_TABLE,
01252                         _("Cannot create table %s. HailDB Error %d (%s)\n"),
01253                         haildb_table_name.c_str(),
01254                         haildb_err, ib_strerror(haildb_err));
01255     return ib_err_t_to_drizzle_error(&session, haildb_err);
01256   }
01257 
01258   return 0;
01259 }
01260 
01261 static int delete_table_message_from_haildb(ib_trx_t transaction, const char* table_name)
01262 {
01263   ib_crsr_t cursor;
01264   ib_tpl_t search_tuple;
01265   int res;
01266   ib_err_t err;
01267 
01268   err= ib_cursor_open_table(HAILDB_TABLE_DEFINITIONS_TABLE, transaction, &cursor);
01269   if (err != DB_SUCCESS)
01270     return err;
01271 
01272   search_tuple= ib_clust_search_tuple_create(cursor);
01273 
01274   err= ib_col_set_value(search_tuple, 0, table_name, strlen(table_name));
01275   if (err != DB_SUCCESS)
01276     goto rollback;
01277 
01278 //  ib_cursor_set_match_mode(cursor, IB_EXACT_MATCH);
01279 
01280   err= ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
01281   if (err == DB_RECORD_NOT_FOUND || res != 0)
01282     goto rollback;
01283 
01284   err= ib_cursor_delete_row(cursor);
01285   assert (err == DB_SUCCESS);
01286 
01287 rollback:
01288   ib_err_t rollback_err= ib_cursor_close(cursor);
01289   if (err == DB_SUCCESS)
01290     err= rollback_err;
01291 
01292   ib_tuple_delete(search_tuple);
01293 
01294   return err;
01295 }
01296 
01297 int HailDBEngine::doDropTable(Session &session,
01298                                       const identifier::Table &identifier)
01299 {
01300   ib_trx_t haildb_schema_transaction;
01301   ib_err_t haildb_err;
01302   string haildb_table_name;
01303 
01304   TableIdentifier_to_haildb_name(identifier, &haildb_table_name);
01305 
01306   haildb_schema_transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
01307   haildb_err= ib_schema_lock_exclusive(haildb_schema_transaction);
01308   if (haildb_err != DB_SUCCESS)
01309   {
01310     ib_err_t rollback_err= ib_trx_rollback(haildb_schema_transaction);
01311 
01312     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01313                         ER_CANT_DELETE_FILE,
01314                         _("Cannot Lock HailDB Data Dictionary. HailDB Error %d (%s)\n"),
01315                         haildb_err, ib_strerror(haildb_err));
01316 
01317     assert (rollback_err == DB_SUCCESS);
01318 
01319     return HA_ERR_GENERIC;
01320   }
01321 
01322   if (identifier.getType() == message::Table::TEMPORARY)
01323   {
01324       session.getMessageCache().removeTableMessage(identifier);
01325       delete_table_message_from_haildb(haildb_schema_transaction,
01326                                        haildb_table_name.c_str());
01327   }
01328   else
01329   {
01330     if (delete_table_message_from_haildb(haildb_schema_transaction, haildb_table_name.c_str()) != DB_SUCCESS)
01331     {
01332       ib_schema_unlock(haildb_schema_transaction);
01333       ib_err_t rollback_err= ib_trx_rollback(haildb_schema_transaction);
01334       assert(rollback_err == DB_SUCCESS);
01335       return HA_ERR_GENERIC;
01336     }
01337   }
01338 
01339   haildb_err= ib_table_drop(haildb_schema_transaction, haildb_table_name.c_str());
01340 
01341   if (haildb_err == DB_TABLE_NOT_FOUND)
01342   {
01343     haildb_err= ib_trx_rollback(haildb_schema_transaction);
01344     assert(haildb_err == DB_SUCCESS);
01345     return ENOENT;
01346   }
01347   else if (haildb_err != DB_SUCCESS)
01348   {
01349     ib_err_t rollback_err= ib_trx_rollback(haildb_schema_transaction);
01350 
01351     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01352                         ER_CANT_DELETE_FILE,
01353                         _("Cannot DROP table %s. HailDB Error %d (%s)\n"),
01354                         haildb_table_name.c_str(),
01355                         haildb_err, ib_strerror(haildb_err));
01356 
01357     assert(rollback_err == DB_SUCCESS);
01358 
01359     return HA_ERR_GENERIC;
01360   }
01361 
01362   haildb_err= ib_trx_commit(haildb_schema_transaction);
01363   if (haildb_err != DB_SUCCESS)
01364   {
01365     ib_err_t rollback_err= ib_trx_rollback(haildb_schema_transaction);
01366 
01367     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01368                         ER_CANT_DELETE_FILE,
01369                         _("Cannot DROP table %s. HailDB Error %d (%s)\n"),
01370                         haildb_table_name.c_str(),
01371                         haildb_err, ib_strerror(haildb_err));
01372 
01373     assert(rollback_err == DB_SUCCESS);
01374     return HA_ERR_GENERIC;
01375   }
01376 
01377   return 0;
01378 }
01379 
01380 static ib_err_t rename_table_message(ib_trx_t transaction, const identifier::Table &from_identifier, const identifier::Table &to_identifier)
01381 {
01382   ib_crsr_t cursor;
01383   ib_tpl_t search_tuple;
01384   ib_tpl_t read_tuple;
01385   ib_tpl_t update_tuple;
01386   int res;
01387   ib_err_t err;
01388   ib_err_t rollback_err;
01389   const char *message;
01390   ib_ulint_t message_len;
01391   drizzled::message::Table table_message;
01392   string from_haildb_table_name;
01393   string to_haildb_table_name;
01394   const char *from;
01395   const char *to;
01396   string serialized_message;
01397   ib_col_meta_t col_meta;
01398 
01399   TableIdentifier_to_haildb_name(from_identifier, &from_haildb_table_name);
01400   TableIdentifier_to_haildb_name(to_identifier, &to_haildb_table_name);
01401 
01402   from= from_haildb_table_name.c_str();
01403   to= to_haildb_table_name.c_str();
01404 
01405   err= ib_cursor_open_table(HAILDB_TABLE_DEFINITIONS_TABLE, transaction, &cursor);
01406   if (err != DB_SUCCESS)
01407   {
01408     rollback_err= ib_trx_rollback(transaction);
01409     assert(rollback_err == DB_SUCCESS);
01410     return err;
01411   }
01412 
01413   search_tuple= ib_clust_search_tuple_create(cursor);
01414   read_tuple= ib_clust_read_tuple_create(cursor);
01415 
01416   err= ib_col_set_value(search_tuple, 0, from, strlen(from));
01417   if (err != DB_SUCCESS)
01418     goto rollback;
01419 
01420 //  ib_cursor_set_match_mode(cursor, IB_EXACT_MATCH);
01421 
01422   err= ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
01423   if (err == DB_RECORD_NOT_FOUND || res != 0)
01424     goto rollback;
01425 
01426   err= ib_cursor_read_row(cursor, read_tuple);
01427   if (err == DB_RECORD_NOT_FOUND || res != 0)
01428     goto rollback;
01429 
01430   message= (const char*)ib_col_get_value(read_tuple, 1);
01431   message_len= ib_col_get_meta(read_tuple, 1, &col_meta);
01432 
01433   if (table_message.ParseFromArray(message, message_len) == false)
01434     goto rollback;
01435 
01436   table_message.set_name(to_identifier.getTableName());
01437   table_message.set_schema(to_identifier.getSchemaName());
01438 
01439   update_tuple= ib_clust_read_tuple_create(cursor);
01440 
01441   err= ib_tuple_copy(update_tuple, read_tuple);
01442   assert(err == DB_SUCCESS);
01443 
01444   err= ib_col_set_value(update_tuple, 0, to, strlen(to));
01445 
01446   try {
01447     table_message.SerializeToString(&serialized_message);
01448   }
01449   catch (...)
01450   {
01451     goto rollback;
01452   }
01453 
01454   err= ib_col_set_value(update_tuple, 1, serialized_message.c_str(),
01455                         serialized_message.length());
01456 
01457   err= ib_cursor_update_row(cursor, read_tuple, update_tuple);
01458 
01459 
01460   ib_tuple_delete(update_tuple);
01461   ib_tuple_delete(read_tuple);
01462   ib_tuple_delete(search_tuple);
01463 
01464   err= ib_cursor_close(cursor);
01465 
01466 rollback:
01467   return err;
01468 }
01469 
01470 int HailDBEngine::doRenameTable(drizzled::Session &session,
01471                                         const drizzled::identifier::Table &from,
01472                                         const drizzled::identifier::Table &to)
01473 {
01474   ib_trx_t haildb_schema_transaction;
01475   ib_err_t err;
01476   string from_haildb_table_name;
01477   string to_haildb_table_name;
01478 
01479   if (to.getType() == message::Table::TEMPORARY
01480       && from.getType() == message::Table::TEMPORARY)
01481   {
01482     session.getMessageCache().renameTableMessage(from, to);
01483     return 0;
01484   }
01485 
01486   TableIdentifier_to_haildb_name(from, &from_haildb_table_name);
01487   TableIdentifier_to_haildb_name(to, &to_haildb_table_name);
01488 
01489   haildb_schema_transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
01490   err= ib_schema_lock_exclusive(haildb_schema_transaction);
01491   if (err != DB_SUCCESS)
01492   {
01493     push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01494                         ER_CANT_DELETE_FILE,
01495                         _("Cannot Lock HailDB Data Dictionary. HailDB Error %d (%s)\n"),
01496                         err, ib_strerror(err));
01497 
01498     goto rollback;
01499   }
01500 
01501   err= ib_table_rename(haildb_schema_transaction,
01502                        from_haildb_table_name.c_str(),
01503                        to_haildb_table_name.c_str());
01504   if (err != DB_SUCCESS)
01505     goto rollback;
01506 
01507   err= rename_table_message(haildb_schema_transaction, from, to);
01508 
01509   if (err != DB_SUCCESS)
01510     goto rollback;
01511 
01512   err= ib_trx_commit(haildb_schema_transaction);
01513   if (err != DB_SUCCESS)
01514     goto rollback;
01515 
01516   return 0;
01517 rollback:
01518   ib_err_t rollback_err= ib_schema_unlock(haildb_schema_transaction);
01519   assert(rollback_err == DB_SUCCESS);
01520   rollback_err= ib_trx_rollback(haildb_schema_transaction);
01521   assert(rollback_err == DB_SUCCESS);
01522   return ib_err_t_to_drizzle_error(&session, err);
01523 }
01524 
01525 void HailDBEngine::getTableNamesInSchemaFromHailDB(
01526                                  const drizzled::identifier::Schema &schema,
01527                                  drizzled::plugin::TableNameList *set_of_names,
01528                                  drizzled::identifier::Table::vector *identifiers)
01529 {
01530   ib_trx_t   transaction;
01531   ib_crsr_t  cursor;
01532   /*
01533     Why not use getPath()?
01534   */
01535   string search_string(schema.getSchemaName());
01536 
01537   boost::algorithm::to_lower(search_string);
01538 
01539   search_string.append("/");
01540 
01541   transaction = ib_trx_begin(IB_TRX_REPEATABLE_READ);
01542   ib_err_t haildb_err= ib_schema_lock_exclusive(transaction);
01543   assert(haildb_err == DB_SUCCESS); /* FIXME: doGetTableNames needs to be able to return error */
01544 
01545   if (search_string.compare("data_dictionary/") == 0)
01546   {
01547     if (set_of_names)
01548     {
01549       BOOST_FOREACH(std::string table_name, haildb_system_table_names)
01550       {
01551         set_of_names->insert(table_name);
01552       }
01553     }
01554     if (identifiers)
01555     {
01556       BOOST_FOREACH(std::string table_name, haildb_system_table_names)
01557       {
01558         identifiers->push_back(identifier::Table(schema.getSchemaName(),
01559                                                table_name));
01560       }
01561     }
01562   }
01563 
01564   haildb_err= ib_cursor_open_table("SYS_TABLES", transaction, &cursor);
01565   assert(haildb_err == DB_SUCCESS); /* FIXME */
01566 
01567   ib_tpl_t read_tuple;
01568   ib_tpl_t search_tuple;
01569 
01570   read_tuple= ib_clust_read_tuple_create(cursor);
01571   search_tuple= ib_clust_search_tuple_create(cursor);
01572 
01573   haildb_err= ib_col_set_value(search_tuple, 0, search_string.c_str(),
01574                                search_string.length());
01575   assert (haildb_err == DB_SUCCESS); // FIXME
01576 
01577   int res;
01578   haildb_err = ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
01579   // fixme: check error above
01580 
01581   while (haildb_err == DB_SUCCESS)
01582   {
01583     haildb_err= ib_cursor_read_row(cursor, read_tuple);
01584 
01585     const char *table_name;
01586     int table_name_len;
01587     ib_col_meta_t column_metadata;
01588 
01589     table_name= (const char*)ib_col_get_value(read_tuple, 0);
01590     table_name_len=  ib_col_get_meta(read_tuple, 0, &column_metadata);
01591 
01592     if (search_string.compare(0, search_string.length(),
01593                               table_name, search_string.length()) == 0)
01594     {
01595       const char *just_table_name= strchr(table_name, '/');
01596       assert(just_table_name);
01597       just_table_name++; /* skip over '/' */
01598       if (set_of_names)
01599         set_of_names->insert(just_table_name);
01600       if (identifiers)
01601         identifiers->push_back(identifier::Table(schema.getSchemaName(), just_table_name));
01602     }
01603 
01604 
01605     haildb_err= ib_cursor_next(cursor);
01606     read_tuple= ib_tuple_clear(read_tuple);
01607   }
01608 
01609   ib_tuple_delete(read_tuple);
01610   ib_tuple_delete(search_tuple);
01611 
01612   haildb_err= ib_cursor_close(cursor);
01613   assert(haildb_err == DB_SUCCESS); // FIXME
01614 
01615   haildb_err= ib_trx_commit(transaction);
01616   assert(haildb_err == DB_SUCCESS); // FIXME
01617 }
01618 
01619 void HailDBEngine::doGetTableIdentifiers(drizzled::CachedDirectory &,
01620                                                  const drizzled::identifier::Schema &schema,
01621                                                  drizzled::identifier::Table::vector &identifiers)
01622 {
01623   getTableNamesInSchemaFromHailDB(schema, NULL, &identifiers);
01624 }
01625 
01626 static int read_table_message_from_haildb(const char* table_name, drizzled::message::Table *table_message)
01627 {
01628   ib_trx_t transaction;
01629   ib_tpl_t search_tuple;
01630   ib_tpl_t read_tuple;
01631   ib_crsr_t cursor;
01632   const char *message;
01633   ib_ulint_t message_len;
01634   ib_col_meta_t col_meta;
01635   int res;
01636   ib_err_t err;
01637   ib_err_t rollback_err;
01638 
01639   transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
01640   err= ib_schema_lock_exclusive(transaction);
01641   if (err != DB_SUCCESS)
01642   {
01643     rollback_err= ib_trx_rollback(transaction);
01644     assert(rollback_err == DB_SUCCESS);
01645     return err;
01646   }
01647 
01648   err= ib_cursor_open_table(HAILDB_TABLE_DEFINITIONS_TABLE, transaction, &cursor);
01649   if (err != DB_SUCCESS)
01650   {
01651     rollback_err= ib_trx_rollback(transaction);
01652     assert(rollback_err == DB_SUCCESS);
01653     return err;
01654   }
01655 
01656   search_tuple= ib_clust_search_tuple_create(cursor);
01657   read_tuple= ib_clust_read_tuple_create(cursor);
01658 
01659   err= ib_col_set_value(search_tuple, 0, table_name, strlen(table_name));
01660   if (err != DB_SUCCESS)
01661     goto rollback;
01662 
01663 //  ib_cursor_set_match_mode(cursor, IB_EXACT_MATCH);
01664 
01665   err= ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
01666   if (err == DB_RECORD_NOT_FOUND || res != 0)
01667     goto rollback;
01668 
01669   err= ib_cursor_read_row(cursor, read_tuple);
01670   if (err == DB_RECORD_NOT_FOUND || res != 0)
01671     goto rollback;
01672 
01673   message= (const char*)ib_col_get_value(read_tuple, 1);
01674   message_len= ib_col_get_meta(read_tuple, 1, &col_meta);
01675 
01676   if (table_message->ParseFromArray(message, message_len) == false)
01677     goto rollback;
01678 
01679   ib_tuple_delete(search_tuple);
01680   ib_tuple_delete(read_tuple);
01681   err= ib_cursor_close(cursor);
01682   if (err != DB_SUCCESS)
01683     goto rollback_close_err;
01684   err= ib_trx_commit(transaction);
01685   if (err != DB_SUCCESS)
01686     goto rollback_close_err;
01687 
01688   return 0;
01689 
01690 rollback:
01691   ib_tuple_delete(search_tuple);
01692   ib_tuple_delete(read_tuple);
01693   rollback_err= ib_cursor_close(cursor);
01694   assert(rollback_err == DB_SUCCESS);
01695 rollback_close_err:
01696   ib_schema_unlock(transaction);
01697   rollback_err= ib_trx_rollback(transaction);
01698   assert(rollback_err == DB_SUCCESS);
01699 
01700   if (strcmp(table_name, HAILDB_TABLE_DEFINITIONS_TABLE) == 0)
01701   {
01702     message::Engine *engine= table_message->mutable_engine();
01703     engine->set_name("InnoDB");
01704     table_message->set_name("haildb_table_definitions");
01705     table_message->set_schema("data_dictionary");
01706     table_message->set_type(message::Table::STANDARD);
01707     table_message->set_creation_timestamp(0);
01708     table_message->set_update_timestamp(0);
01709 
01710     message::Table::TableOptions *options= table_message->mutable_options();
01711     options->set_collation_id(my_charset_bin.number);
01712     options->set_collation(my_charset_bin.name);
01713 
01714     message::Table::Field *field= table_message->add_field();
01715     field->set_name("table_name");
01716     field->set_type(message::Table::Field::VARCHAR);
01717     message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
01718     stropt->set_length(IB_MAX_TABLE_NAME_LEN);
01719     stropt->set_collation_id(my_charset_bin.number);
01720     stropt->set_collation(my_charset_bin.name);
01721 
01722     field= table_message->add_field();
01723     field->set_name("message");
01724     field->set_type(message::Table::Field::BLOB);
01725     stropt= field->mutable_string_options();
01726     stropt->set_collation_id(my_charset_bin.number);
01727     stropt->set_collation(my_charset_bin.name);
01728 
01729     message::Table::Index *index= table_message->add_indexes();
01730     index->set_name("PRIMARY");
01731     index->set_is_primary(true);
01732     index->set_is_unique(true);
01733     index->set_type(message::Table::Index::BTREE);
01734     index->set_key_length(IB_MAX_TABLE_NAME_LEN);
01735     message::Table::Index::IndexPart *part= index->add_index_part();
01736     part->set_fieldnr(0);
01737     part->set_compare_length(IB_MAX_TABLE_NAME_LEN);
01738 
01739     return 0;
01740   }
01741 
01742   return -1;
01743 }
01744 
01745 int HailDBEngine::doGetTableDefinition(Session &session,
01746                                                const identifier::Table &identifier,
01747                                                drizzled::message::Table &table)
01748 {
01749   ib_crsr_t haildb_cursor= NULL;
01750   string haildb_table_name;
01751 
01752   /* Check temporary tables!? */
01753   if (session.getMessageCache().getTableMessage(identifier, table))
01754     return EEXIST;
01755 
01756   TableIdentifier_to_haildb_name(identifier, &haildb_table_name);
01757 
01758   if (ib_cursor_open_table(haildb_table_name.c_str(), NULL, &haildb_cursor) != DB_SUCCESS)
01759     return ENOENT;
01760 
01761   ib_err_t err= ib_cursor_close(haildb_cursor);
01762 
01763   assert (err == DB_SUCCESS);
01764 
01765   if (read_table_message_from_haildb(haildb_table_name.c_str(), &table) != 0)
01766   {
01767     if (get_haildb_system_table_message(haildb_table_name.c_str(), &table) == 0)
01768       return EEXIST;
01769   }
01770 
01771   return EEXIST;
01772 }
01773 
01774 bool HailDBEngine::doDoesTableExist(Session &,
01775                                     const identifier::Table& identifier)
01776 {
01777   ib_crsr_t haildb_cursor;
01778   string haildb_table_name;
01779 
01780   TableIdentifier_to_haildb_name(identifier, &haildb_table_name);
01781 
01782   boost::unordered_set<string>::iterator iter= haildb_system_table_names.find(identifier.getTableName());
01783   if (iter != haildb_system_table_names.end())
01784     return true;
01785 
01786   if (ib_cursor_open_table(haildb_table_name.c_str(), NULL, &haildb_cursor) != DB_SUCCESS)
01787     return false;
01788 
01789   ib_err_t err= ib_cursor_close(haildb_cursor);
01790   assert(err == DB_SUCCESS);
01791 
01792   return true;
01793 }
01794 
01795 const char *HailDBCursor::index_type(uint32_t)
01796 {
01797   return("BTREE");
01798 }
01799 
01800 static ib_err_t write_row_to_haildb_tuple(const unsigned char* buf,
01801                                           Field **fields, ib_tpl_t tuple)
01802 {
01803   int colnr= 0;
01804   ib_err_t err= DB_ERROR;
01805   ptrdiff_t row_offset= buf - (*fields)->getTable()->getInsertRecord();
01806 
01807   for (Field **field= fields; *field; field++, colnr++)
01808   {
01809     (**field).move_field_offset(row_offset);
01810 
01811     if (! (**field).isWriteSet() && (**field).is_null())
01812     {
01813       (**field).move_field_offset(-row_offset);
01814       continue;
01815     }
01816 
01817     if ((**field).is_null())
01818     {
01819       err= ib_col_set_value(tuple, colnr, NULL, IB_SQL_NULL);
01820       assert(err == DB_SUCCESS);
01821       (**field).move_field_offset(-row_offset);
01822       continue;
01823     }
01824 
01825     if ((**field).type() == DRIZZLE_TYPE_VARCHAR)
01826     {
01827       /* To get around the length bytes (1 or 2) at (**field).ptr
01828          we can use Field_varstring::val_str to a String
01829          to get a pointer to the real string without copying it.
01830       */
01831       String str;
01832       (**field).setReadSet();
01833       (**field).val_str_internal(&str);
01834       err= ib_col_set_value(tuple, colnr, str.ptr(), str.length());
01835     }
01836     else if ((**field).type() == DRIZZLE_TYPE_ENUM)
01837     {
01838       err= ib_tuple_write_u32(tuple, colnr, *((ib_u32_t*)(*field)->ptr));
01839     }
01840     else if ((**field).type() == DRIZZLE_TYPE_DATE)
01841     {
01842       (**field).setReadSet();
01843       err= ib_tuple_write_u32(tuple, colnr, (*field)->val_int());
01844     }
01845     else if ((**field).type() == DRIZZLE_TYPE_BLOB)
01846     {
01847       Field_blob *blob= reinterpret_cast<Field_blob*>(*field);
01848       unsigned char* blob_ptr;
01849       uint32_t blob_length= blob->get_length();
01850       blob->get_ptr(&blob_ptr);
01851       err= ib_col_set_value(tuple, colnr, blob_ptr, blob_length);
01852     }
01853     else
01854     {
01855       err= ib_col_set_value(tuple, colnr, (*field)->ptr, (*field)->data_length());
01856     }
01857 
01858     assert (err == DB_SUCCESS);
01859 
01860     (**field).move_field_offset(-row_offset);
01861   }
01862 
01863   return err;
01864 }
01865 
01866 static uint64_t innobase_get_int_col_max_value(const Field* field)
01867 {
01868   uint64_t  max_value = 0;
01869 
01870   switch(field->key_type()) {
01871     /* TINY */
01872   case HA_KEYTYPE_BINARY:
01873     max_value = 0xFFULL;
01874     break;
01875     /* LONG */
01876   case HA_KEYTYPE_ULONG_INT:
01877     max_value = 0xFFFFFFFFULL;
01878     break;
01879   case HA_KEYTYPE_LONG_INT:
01880     max_value = 0x7FFFFFFFULL;
01881     break;
01882     /* BIG */
01883   case HA_KEYTYPE_ULONGLONG:
01884     max_value = 0xFFFFFFFFFFFFFFFFULL;
01885     break;
01886   case HA_KEYTYPE_LONGLONG:
01887     max_value = 0x7FFFFFFFFFFFFFFFULL;
01888     break;
01889   case HA_KEYTYPE_DOUBLE:
01890     /* We use the maximum as per IEEE754-2008 standard, 2^53 */
01891     max_value = 0x20000000000000ULL;
01892     break;
01893   default:
01894     assert(false);
01895   }
01896 
01897   return(max_value);
01898 }
01899 
01900 int HailDBCursor::doInsertRecord(unsigned char *record)
01901 {
01902   ib_err_t err;
01903   int ret= 0;
01904 
01905   ib_trx_t transaction= *get_trx(getTable()->in_use);
01906 
01907   tuple= ib_clust_read_tuple_create(cursor);
01908 
01909   if (cursor_is_sec_index)
01910   {
01911     err= ib_cursor_close(cursor);
01912     assert(err == DB_SUCCESS);
01913 
01914     err= ib_cursor_open_table_using_id(table_id, transaction, &cursor);
01915 
01916     if (err != DB_SUCCESS)
01917       return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
01918 
01919     cursor_is_sec_index= false;
01920   }
01921   else
01922   {
01923     ib_cursor_attach_trx(cursor, transaction);
01924   }
01925 
01926   err= ib_cursor_first(cursor);
01927   if (current_session->lex().sql_command == SQLCOM_CREATE_TABLE
01928       && err == DB_MISSING_HISTORY)
01929   {
01930     /* See https://bugs.launchpad.net/drizzle/+bug/556978
01931      *
01932      * In CREATE SELECT, transaction is started in ::store_lock
01933      * at the start of the statement, before the table is created.
01934      * This means the table doesn't exist in our snapshot,
01935      * and we get a DB_MISSING_HISTORY error on ib_cursor_first().
01936      * The way to get around this is to here, restart the transaction
01937      * and continue.
01938      *
01939      * yuck.
01940      */
01941 
01942     HailDBEngine *storage_engine= static_cast<HailDBEngine*>(getEngine());
01943     err= ib_cursor_reset(cursor);
01944     storage_engine->doCommit(current_session, true);
01945     storage_engine->doStartTransaction(current_session, START_TRANS_NO_OPTIONS);
01946     transaction= *get_trx(getTable()->in_use);
01947     assert(err == DB_SUCCESS);
01948     ib_cursor_attach_trx(cursor, transaction);
01949     err= ib_cursor_first(cursor);
01950   }
01951 
01952   assert(err == DB_SUCCESS || err == DB_END_OF_INDEX);
01953 
01954 
01955   if (getTable()->next_number_field)
01956   {
01957     update_auto_increment();
01958 
01959     uint64_t temp_auto= getTable()->next_number_field->val_int();
01960 
01961     if (temp_auto <= innobase_get_int_col_max_value(getTable()->next_number_field))
01962     {
01963       while (true)
01964       {
01965         uint64_t fetched_auto= share->auto_increment_value;
01966 
01967         if (temp_auto >= fetched_auto)
01968         {
01969           uint64_t store_value= temp_auto+1;
01970           if (store_value == 0)
01971             store_value++;
01972 
01973           if (share->auto_increment_value.compare_and_swap(store_value, fetched_auto) == fetched_auto)
01974             break;
01975         }
01976         else
01977           break;
01978       }
01979     }
01980 
01981   }
01982 
01983   write_row_to_haildb_tuple(record, getTable()->getFields(), tuple);
01984 
01985   if (share->has_hidden_primary_key)
01986   {
01987     err= ib_tuple_write_u64(tuple, getTable()->getShare()->sizeFields(),
01988                             share->hidden_pkey_auto_increment_value.fetch_and_increment());
01989   }
01990 
01991   err= ib_cursor_insert_row(cursor, tuple);
01992 
01993   if (err == DB_DUPLICATE_KEY)
01994   {
01995     if (write_can_replace)
01996     {
01997       store_key_value_from_haildb(getTable()->key_info + getTable()->getShare()->getPrimaryKey(),
01998                                   ref, ref_length, record);
01999 
02000       ib_tpl_t search_tuple= ib_clust_search_tuple_create(cursor);
02001 
02002       fill_ib_search_tpl_from_drizzle_key(search_tuple,
02003                                           getTable()->key_info + 0,
02004                                           ref, ref_length);
02005 
02006       int res;
02007       err= ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
02008       assert(err == DB_SUCCESS);
02009       ib_tuple_delete(search_tuple);
02010 
02011       tuple= ib_tuple_clear(tuple);
02012       err= ib_cursor_delete_row(cursor);
02013 
02014       err= ib_cursor_first(cursor);
02015       assert(err == DB_SUCCESS || err == DB_END_OF_INDEX);
02016 
02017       write_row_to_haildb_tuple(record, getTable()->getFields(), tuple);
02018 
02019       err= ib_cursor_insert_row(cursor, tuple);
02020       assert(err==DB_SUCCESS); // probably be nice and process errors
02021     }
02022     else
02023       ret= HA_ERR_FOUND_DUPP_KEY;
02024   }
02025   else if (err != DB_SUCCESS)
02026     ret= ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02027 
02028   tuple= ib_tuple_clear(tuple);
02029   ib_tuple_delete(tuple);
02030   tuple= NULL;
02031   err= ib_cursor_reset(cursor);
02032 
02033   return ret;
02034 }
02035 
02036 int HailDBCursor::doUpdateRecord(const unsigned char *old_data,
02037                                  unsigned char *new_data)
02038 {
02039   ib_tpl_t update_tuple;
02040   ib_err_t err;
02041   bool created_tuple= false;
02042 
02043   update_tuple= ib_clust_read_tuple_create(cursor);
02044 
02045   if (tuple == NULL)
02046   {
02047     ib_trx_t transaction= *get_trx(getTable()->in_use);
02048 
02049     if (cursor_is_sec_index)
02050     {
02051       err= ib_cursor_close(cursor);
02052       assert(err == DB_SUCCESS);
02053 
02054       err= ib_cursor_open_table_using_id(table_id, transaction, &cursor);
02055 
02056       if (err != DB_SUCCESS)
02057         return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02058       cursor_is_sec_index= false;
02059     }
02060     else
02061     {
02062       ib_cursor_attach_trx(cursor, transaction);
02063     }
02064 
02065     store_key_value_from_haildb(getTable()->key_info + getTable()->getShare()->getPrimaryKey(),
02066                                   ref, ref_length, old_data);
02067 
02068     ib_tpl_t search_tuple= ib_clust_search_tuple_create(cursor);
02069 
02070     fill_ib_search_tpl_from_drizzle_key(search_tuple,
02071                                         getTable()->key_info + 0,
02072                                         ref, ref_length);
02073 
02074     err= ib_cursor_set_lock_mode(cursor, IB_LOCK_X);
02075     assert(err == DB_SUCCESS);
02076 
02077     int res;
02078     err= ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
02079     assert(err == DB_SUCCESS);
02080 
02081     tuple= ib_clust_read_tuple_create(cursor);
02082 
02083     err= ib_cursor_read_row(cursor, tuple);
02084     assert(err == DB_SUCCESS);// FIXME
02085 
02086     created_tuple= true;
02087   }
02088 
02089   err= ib_tuple_copy(update_tuple, tuple);
02090   assert(err == DB_SUCCESS);
02091 
02092   write_row_to_haildb_tuple(new_data, getTable()->getFields(), update_tuple);
02093 
02094   err= ib_cursor_update_row(cursor, tuple, update_tuple);
02095 
02096   ib_tuple_delete(update_tuple);
02097 
02098   if (created_tuple)
02099   {
02100     ib_err_t ib_err= ib_cursor_reset(cursor); //fixme check error
02101     assert(ib_err == DB_SUCCESS);
02102     tuple= ib_tuple_clear(tuple);
02103     ib_tuple_delete(tuple);
02104     tuple= NULL;
02105   }
02106 
02107   advance_cursor= true;
02108 
02109   return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02110 }
02111 
02112 int HailDBCursor::doDeleteRecord(const unsigned char *)
02113 {
02114   ib_err_t err;
02115 
02116   assert(ib_cursor_is_positioned(cursor) == IB_TRUE);
02117   err= ib_cursor_delete_row(cursor);
02118 
02119   advance_cursor= true;
02120 
02121   return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02122 }
02123 
02124 int HailDBCursor::delete_all_rows(void)
02125 {
02126   /* I *think* ib_truncate is non-transactional....
02127      so only support TRUNCATE and not DELETE FROM t;
02128      (this is what ha_innodb does)
02129   */
02130   if (getTable()->in_use->getSqlCommand() != SQLCOM_TRUNCATE)
02131     return HA_ERR_WRONG_COMMAND;
02132 
02133   ib_id_t id;
02134   ib_err_t err;
02135 
02136   ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
02137 
02138   if (cursor_is_sec_index)
02139   {
02140     err= ib_cursor_close(cursor);
02141     assert(err == DB_SUCCESS);
02142 
02143     err= ib_cursor_open_table_using_id(table_id, transaction, &cursor);
02144 
02145     if (err != DB_SUCCESS)
02146       return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02147     cursor_is_sec_index= false;
02148   }
02149   else
02150   {
02151     ib_cursor_attach_trx(cursor, transaction);
02152   }
02153 
02154   err= ib_schema_lock_exclusive(transaction);
02155   if (err != DB_SUCCESS)
02156   {
02157     ib_err_t rollback_err= ib_trx_rollback(transaction);
02158 
02159     push_warning_printf(getTable()->in_use, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
02160                         ER_CANT_DELETE_FILE,
02161                         _("Cannot Lock HailDB Data Dictionary. HailDB Error %d (%s)\n"),
02162                         err, ib_strerror(err));
02163 
02164     assert (rollback_err == DB_SUCCESS);
02165 
02166     return HA_ERR_GENERIC;
02167   }
02168 
02169   share->auto_increment_value.fetch_and_store(1);
02170 
02171   err= ib_cursor_truncate(&cursor, &id);
02172   if (err != DB_SUCCESS)
02173     goto err;
02174 
02175   ib_schema_unlock(transaction);
02176   /* ib_cursor_truncate commits on success */
02177 
02178   err= ib_cursor_open_table_using_id(id, NULL, &cursor);
02179   if (err != DB_SUCCESS)
02180     goto err;
02181 
02182   return 0;
02183 
02184 err:
02185   ib_schema_unlock(transaction);
02186   ib_err_t rollback_err= ib_trx_rollback(transaction);
02187   assert(rollback_err == DB_SUCCESS);
02188   return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02189 }
02190 
02191 int HailDBCursor::doStartTableScan(bool)
02192 {
02193   ib_err_t err= DB_SUCCESS;
02194   ib_trx_t transaction;
02195 
02196   if (in_table_scan)
02197     doEndTableScan();
02198   in_table_scan= true;
02199 
02200   transaction= *get_trx(getTable()->in_use);
02201 
02202   assert(transaction != NULL);
02203 
02204   if (cursor_is_sec_index)
02205   {
02206     err= ib_cursor_close(cursor);
02207     assert(err == DB_SUCCESS);
02208 
02209     err= ib_cursor_open_table_using_id(table_id, transaction, &cursor);
02210     cursor_is_sec_index= false;
02211   }
02212   else
02213   {
02214     ib_cursor_attach_trx(cursor, transaction);
02215   }
02216 
02217   if (err != DB_SUCCESS)
02218     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02219 
02220   err= ib_cursor_set_lock_mode(cursor, ib_lock_mode);
02221   assert(err == DB_SUCCESS); // FIXME
02222 
02223   tuple= ib_clust_read_tuple_create(cursor);
02224 
02225   err= ib_cursor_first(cursor);
02226   if (err != DB_SUCCESS && err != DB_END_OF_INDEX)
02227   {
02228     int reset_err= ib_cursor_reset(cursor);
02229     assert(reset_err == DB_SUCCESS);
02230     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02231   }
02232 
02233   advance_cursor= false;
02234 
02235   return(0);
02236 }
02237 
02238 int read_row_from_haildb(Session *session, unsigned char* buf, ib_crsr_t cursor, ib_tpl_t tuple, Table* table, bool has_hidden_primary_key, uint64_t *hidden_pkey, drizzled::memory::Root **blobroot)
02239 {
02240   ib_err_t err;
02241   ptrdiff_t row_offset= buf - table->getInsertRecord();
02242 
02243   err= ib_cursor_read_row(cursor, tuple);
02244 
02245   if (err == DB_RECORD_NOT_FOUND)
02246     return HA_ERR_END_OF_FILE;
02247   if (err != DB_SUCCESS)
02248     return ib_err_t_to_drizzle_error(session, err);
02249 
02250   int colnr= 0;
02251 
02252   /* We need the primary key for ::position() to work */
02253   if (table->getShare()->getPrimaryKey() != MAX_KEY)
02254     table->mark_columns_used_by_index_no_reset(table->getShare()->getPrimaryKey());
02255 
02256   for (Field **field= table->getFields() ; *field ; field++, colnr++)
02257   {
02258     if (! (**field).isReadSet())
02259       (**field).setReadSet(); /* Fucking broken API screws us royally. */
02260 
02261     (**field).move_field_offset(row_offset);
02262 
02263     (**field).setWriteSet();
02264 
02265     uint32_t length= ib_col_get_len(tuple, colnr);
02266     if (length == IB_SQL_NULL)
02267     {
02268       (**field).set_null();
02269       (**field).move_field_offset(-row_offset);
02270       continue;
02271     }
02272     else
02273       (**field).set_notnull();
02274 
02275     if ((**field).type() == DRIZZLE_TYPE_VARCHAR)
02276     {
02277       (*field)->store((const char*)ib_col_get_value(tuple, colnr),
02278                       length,
02279                       &my_charset_bin);
02280     }
02281     else if ((**field).type() == DRIZZLE_TYPE_DATE)
02282     {
02283       uint32_t date_read;
02284       err= ib_tuple_read_u32(tuple, colnr, &date_read);
02285       (*field)->store(date_read);
02286     }
02287     else if ((**field).type() == DRIZZLE_TYPE_BLOB)
02288     {
02289       if (blobroot == NULL)
02290         (reinterpret_cast<Field_blob*>(*field))->set_ptr(length,
02291                                       (unsigned char*)ib_col_get_value(tuple,
02292                                                                        colnr));
02293       else
02294       {
02295         if (*blobroot == NULL)
02296         {
02297           *blobroot= new drizzled::memory::Root();
02298           (**blobroot).init_alloc_root();
02299         }
02300 
02301         unsigned char *blob_ptr= (unsigned char*)(**blobroot).alloc_root(length);
02302         memcpy(blob_ptr, ib_col_get_value(tuple, colnr), length);
02303         (reinterpret_cast<Field_blob*>(*field))->set_ptr(length, blob_ptr);
02304       }
02305     }
02306     else
02307     {
02308       ib_col_copy_value(tuple, colnr, (*field)->ptr, (*field)->data_length());
02309     }
02310 
02311     (**field).move_field_offset(-row_offset);
02312 
02313     if (err != DB_SUCCESS)
02314       return ib_err_t_to_drizzle_error(session, err);
02315   }
02316 
02317   if (has_hidden_primary_key)
02318   {
02319     err= ib_tuple_read_u64(tuple, colnr, hidden_pkey);
02320   }
02321 
02322   return ib_err_t_to_drizzle_error(session, err);
02323 }
02324 
02325 int HailDBCursor::rnd_next(unsigned char *buf)
02326 {
02327   ib_err_t err;
02328   int ret;
02329 
02330   if (advance_cursor)
02331   {
02332     err= ib_cursor_next(cursor);
02333     if (err != DB_SUCCESS)
02334       return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02335   }
02336 
02337   tuple= ib_tuple_clear(tuple);
02338   ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02339                             getTable(),
02340                             share->has_hidden_primary_key,
02341                             &hidden_autoinc_pkey_position);
02342 
02343   advance_cursor= true;
02344   return ret;
02345 }
02346 
02347 int HailDBCursor::doEndTableScan()
02348 {
02349   ib_err_t err;
02350 
02351   ib_tuple_delete(tuple);
02352   tuple= NULL;
02353   err= ib_cursor_reset(cursor);
02354   assert(err == DB_SUCCESS);
02355   in_table_scan= false;
02356   return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02357 }
02358 
02359 int HailDBCursor::rnd_pos(unsigned char *buf, unsigned char *pos)
02360 {
02361   ib_err_t err;
02362   int res;
02363   int ret= 0;
02364   ib_tpl_t search_tuple= ib_clust_search_tuple_create(cursor);
02365 
02366   if (share->has_hidden_primary_key)
02367   {
02368     err= ib_col_set_value(search_tuple, 0,
02369                           ((uint64_t*)(pos)), sizeof(uint64_t));
02370     if (err != DB_SUCCESS)
02371       return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02372   }
02373   else
02374   {
02375     unsigned int keynr;
02376     if (getTable()->getShare()->getPrimaryKey() != MAX_KEY)
02377       keynr= getTable()->getShare()->getPrimaryKey();
02378     else
02379       keynr= get_first_unique_index(*getTable());
02380 
02381     fill_ib_search_tpl_from_drizzle_key(search_tuple,
02382                                         getTable()->key_info + keynr,
02383                                         pos, ref_length);
02384   }
02385 
02386   err= ib_cursor_moveto(cursor, search_tuple, IB_CUR_GE, &res);
02387   if (err != DB_SUCCESS)
02388     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02389 
02390   assert(res==0);
02391   if (res != 0)
02392     ret= -1;
02393 
02394   ib_tuple_delete(search_tuple);
02395 
02396   tuple= ib_tuple_clear(tuple);
02397 
02398   if (ret == 0)
02399     ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02400                               getTable(),
02401                               share->has_hidden_primary_key,
02402                               &hidden_autoinc_pkey_position);
02403 
02404   advance_cursor= true;
02405 
02406   return(ret);
02407 }
02408 
02409 static void store_key_value_from_haildb(KeyInfo *key_info, unsigned char* ref, int ref_len, const unsigned char *record)
02410 {
02411   KeyPartInfo* key_part= key_info->key_part;
02412   KeyPartInfo* end= key_info->key_part + key_info->key_parts;
02413   unsigned char* ref_start= ref;
02414 
02415   memset(ref, 0, ref_len);
02416 
02417   for (; key_part != end; key_part++)
02418   {
02419     char is_null= 0;
02420 
02421     if(key_part->null_bit)
02422     {
02423       *ref= is_null= record[key_part->null_offset] & key_part->null_bit;
02424       ref++;
02425     }
02426 
02427     Field *field= key_part->field;
02428 
02429     if (field->type() == DRIZZLE_TYPE_VARCHAR)
02430     {
02431       if (is_null)
02432       {
02433         ref+= key_part->length + 2; /* 2 bytes for length */
02434         continue;
02435       }
02436 
02437       String str;
02438       field->val_str_internal(&str);
02439 
02440       *ref++= (char)(str.length() & 0x000000ff);
02441       *ref++= (char)((str.length()>>8) & 0x000000ff);
02442 
02443       memcpy(ref, str.ptr(), str.length());
02444       ref+= key_part->length;
02445     }
02446     // FIXME: blobs.
02447     else
02448     {
02449       if (is_null)
02450       {
02451         ref+= key_part->length;
02452         continue;
02453       }
02454 
02455       memcpy(ref, record+key_part->offset, key_part->length);
02456       ref+= key_part->length;
02457     }
02458 
02459   }
02460 
02461   assert(ref == ref_start + ref_len);
02462 }
02463 
02464 void HailDBCursor::position(const unsigned char *record)
02465 {
02466   if (share->has_hidden_primary_key)
02467     *((uint64_t*) ref)= hidden_autoinc_pkey_position;
02468   else
02469   {
02470     unsigned int keynr;
02471     if (getTable()->getShare()->getPrimaryKey() != MAX_KEY)
02472       keynr= getTable()->getShare()->getPrimaryKey();
02473     else
02474       keynr= get_first_unique_index(*getTable());
02475 
02476     store_key_value_from_haildb(getTable()->key_info + keynr,
02477                                 ref, ref_length, record);
02478   }
02479 
02480   return;
02481 }
02482 
02483 double HailDBCursor::scan_time()
02484 {
02485   ib_table_stats_t table_stats;
02486   ib_err_t err;
02487 
02488   err= ib_get_table_statistics(cursor, &table_stats, sizeof(table_stats));
02489 
02490   /* Approximate I/O seeks for full table scan */
02491   return (double) (table_stats.stat_clustered_index_size / 16384);
02492 }
02493 
02494 int HailDBCursor::info(uint32_t flag)
02495 {
02496   ib_table_stats_t table_stats;
02497   ib_err_t err;
02498 
02499   if (flag & HA_STATUS_VARIABLE)
02500   {
02501     err= ib_get_table_statistics(cursor, &table_stats, sizeof(table_stats));
02502 
02503     stats.records= table_stats.stat_n_rows;
02504 
02505     if (table_stats.stat_n_rows < 2)
02506       stats.records= 2;
02507 
02508     stats.deleted= 0;
02509     stats.data_file_length= table_stats.stat_clustered_index_size;
02510     stats.index_file_length= table_stats.stat_sum_of_other_index_sizes;
02511 
02512     stats.mean_rec_length= stats.data_file_length / stats.records;
02513   }
02514 
02515   if (flag & HA_STATUS_AUTO)
02516     stats.auto_increment_value= 1;
02517 
02518   if (flag & HA_STATUS_ERRKEY) {
02519     const char *err_table_name;
02520     const char *err_index_name;
02521 
02522     ib_trx_t transaction= *get_trx(getTable()->in_use);
02523 
02524     err= ib_get_duplicate_key(transaction, &err_table_name, &err_index_name);
02525 
02526     errkey= UINT32_MAX;
02527 
02528     for (unsigned int i = 0; i < getTable()->getShare()->keys; i++)
02529     {
02530       if (strcmp(err_index_name, getTable()->key_info[i].name) == 0)
02531       {
02532         errkey= i;
02533         break;
02534       }
02535     }
02536 
02537   }
02538 
02539   if (flag & HA_STATUS_CONST)
02540   {
02541     for (unsigned int i = 0; i < getTable()->getShare()->sizeKeys(); i++)
02542     {
02543       const char* index_name= getTable()->key_info[i].name;
02544       uint64_t ncols;
02545       int64_t *n_diff;
02546       ha_rows rec_per_key;
02547 
02548       err= ib_get_index_stat_n_diff_key_vals(cursor, index_name,
02549                                              &ncols, &n_diff);
02550 
02551       if (err != DB_SUCCESS)
02552         return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02553 
02554       for (unsigned int j=0; j < getTable()->key_info[i].key_parts; j++)
02555       {
02556         if (n_diff[j+1] == 0)
02557           rec_per_key= stats.records;
02558         else
02559           rec_per_key= stats.records / n_diff[j+1];
02560 
02561         /* We import this heuristic from ha_innodb, which says
02562            that MySQL favours table scans too much over index searches,
02563            so we pretend our index selectivity is 2 times better. */
02564 
02565         rec_per_key= rec_per_key / 2;
02566 
02567         if (rec_per_key == 0)
02568           rec_per_key= 1;
02569 
02570         getTable()->key_info[i].rec_per_key[j]=
02571           rec_per_key >= ~(ulong) 0 ? ~(ulong) 0 :
02572           (ulong) rec_per_key;
02573       }
02574 
02575       free(n_diff);
02576     }
02577   }
02578 
02579   return(0);
02580 }
02581 
02582 int HailDBCursor::doStartIndexScan(uint32_t keynr, bool)
02583 {
02584   ib_err_t err;
02585   ib_trx_t transaction= *get_trx(getTable()->in_use);
02586 
02587   active_index= keynr;
02588 
02589   if (active_index == 0 && ! share->has_hidden_primary_key)
02590   {
02591     if (cursor_is_sec_index)
02592     {
02593       err= ib_cursor_close(cursor);
02594       assert(err == DB_SUCCESS);
02595 
02596       err= ib_cursor_open_table_using_id(table_id, transaction, &cursor);
02597 
02598       if (err != DB_SUCCESS)
02599         return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02600 
02601     }
02602     else
02603     {
02604       ib_cursor_attach_trx(cursor, transaction);
02605     }
02606 
02607     cursor_is_sec_index= false;
02608     tuple= ib_clust_read_tuple_create(cursor);
02609   }
02610   else
02611   {
02612     ib_id_t index_id;
02613     err= ib_index_get_id(table_path_to_haildb_name(getShare()->getPath()),
02614                          getShare()->getKeyInfo(keynr).name,
02615                          &index_id);
02616     if (err != DB_SUCCESS)
02617       return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02618 
02619     err= ib_cursor_close(cursor);
02620     assert(err == DB_SUCCESS);
02621 
02622     err= ib_cursor_open_index_using_id(index_id, transaction, &cursor);
02623 
02624     if (err != DB_SUCCESS)
02625       return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02626 
02627     cursor_is_sec_index= true;
02628 
02629     tuple= ib_clust_read_tuple_create(cursor);
02630     ib_cursor_set_cluster_access(cursor);
02631   }
02632 
02633   err= ib_cursor_set_lock_mode(cursor, ib_lock_mode);
02634   assert(err == DB_SUCCESS);
02635 
02636   advance_cursor= false;
02637   return 0;
02638 }
02639 
02640 static ib_srch_mode_t ha_rkey_function_to_ib_srch_mode(drizzled::ha_rkey_function find_flag)
02641 {
02642   switch (find_flag)
02643   {
02644   case HA_READ_KEY_EXACT:
02645     return IB_CUR_GE;
02646   case HA_READ_KEY_OR_NEXT:
02647     return IB_CUR_GE;
02648   case HA_READ_KEY_OR_PREV:
02649     return IB_CUR_LE;
02650   case HA_READ_AFTER_KEY:
02651     return IB_CUR_G;
02652   case HA_READ_BEFORE_KEY:
02653     return IB_CUR_L;
02654   case HA_READ_PREFIX:
02655     return IB_CUR_GE;
02656   case HA_READ_PREFIX_LAST:
02657     return IB_CUR_LE;
02658   case HA_READ_PREFIX_LAST_OR_PREV:
02659     return IB_CUR_LE;
02660   case HA_READ_MBR_CONTAIN:
02661   case HA_READ_MBR_INTERSECT:
02662   case HA_READ_MBR_WITHIN:
02663   case HA_READ_MBR_DISJOINT:
02664   case HA_READ_MBR_EQUAL:
02665     assert(false); /* these just exist in the enum, not used. */
02666   }
02667 
02668   assert(false);
02669   /* Must return or compiler complains about reaching end of function */
02670   return (ib_srch_mode_t)0;
02671 }
02672 
02673 static void fill_ib_search_tpl_from_drizzle_key(ib_tpl_t search_tuple,
02674                                                 const drizzled::KeyInfo *key_info,
02675                                                 const unsigned char *key_ptr,
02676                                                 uint32_t key_len)
02677 {
02678   KeyPartInfo *key_part= key_info->key_part;
02679   KeyPartInfo *end= key_part + key_info->key_parts;
02680   const unsigned char *buff= key_ptr;
02681   ib_err_t err;
02682 
02683   int fieldnr= 0;
02684 
02685   for(; key_part != end && buff < key_ptr + key_len; key_part++)
02686   {
02687     Field *field= key_part->field;
02688     bool is_null= false;
02689 
02690     if (key_part->null_bit)
02691     {
02692       is_null= *buff;
02693       if (is_null)
02694       {
02695         err= ib_col_set_value(search_tuple, fieldnr, NULL, IB_SQL_NULL);
02696         assert(err == DB_SUCCESS);
02697       }
02698       buff++;
02699     }
02700 
02701     if (field->type() == DRIZZLE_TYPE_VARCHAR)
02702     {
02703       if (is_null)
02704       {
02705         buff+= key_part->length + 2; /* 2 bytes length */
02706         continue;
02707       }
02708 
02709       int length= *buff + (*(buff + 1) << 8);
02710       buff+=2;
02711       err= ib_col_set_value(search_tuple, fieldnr, buff, length);
02712       assert(err == DB_SUCCESS);
02713 
02714       buff+= key_part->length;
02715     }
02716     else if (field->type() == DRIZZLE_TYPE_DATE)
02717     {
02718       uint32_t date_int= static_cast<uint32_t>(field->val_int());
02719       err= ib_col_set_value(search_tuple, fieldnr, &date_int, 4);
02720       buff+= key_part->length;
02721     }
02722     // FIXME: BLOBs
02723     else
02724     {
02725       if (is_null)
02726       {
02727         buff+= key_part->length;
02728         continue;
02729       }
02730 
02731       err= ib_col_set_value(search_tuple, fieldnr,
02732                             buff, key_part->length);
02733       assert(err == DB_SUCCESS);
02734 
02735       buff+= key_part->length;
02736     }
02737 
02738     fieldnr++;
02739   }
02740 
02741   assert(buff == key_ptr + key_len);
02742 }
02743 
02744 int HailDBCursor::haildb_index_read(unsigned char *buf,
02745                                             const unsigned char *key_ptr,
02746                                             uint32_t key_len,
02747                                             drizzled::ha_rkey_function find_flag,
02748                                             bool allocate_blobs)
02749 {
02750   ib_tpl_t search_tuple;
02751   int res;
02752   ib_err_t err;
02753   int ret;
02754   ib_srch_mode_t search_mode;
02755 
02756   search_mode= ha_rkey_function_to_ib_srch_mode(find_flag);
02757 
02758   if (active_index == 0 && ! share->has_hidden_primary_key)
02759     search_tuple= ib_clust_search_tuple_create(cursor);
02760   else
02761     search_tuple= ib_sec_search_tuple_create(cursor);
02762 
02763   fill_ib_search_tpl_from_drizzle_key(search_tuple,
02764                                       getTable()->key_info + active_index,
02765                                       key_ptr, key_len);
02766 
02767   err= ib_cursor_moveto(cursor, search_tuple, search_mode, &res);
02768   ib_tuple_delete(search_tuple);
02769 
02770   if ((err == DB_RECORD_NOT_FOUND || err == DB_END_OF_INDEX))
02771   {
02772     getTable()->status= STATUS_NOT_FOUND;
02773     return HA_ERR_KEY_NOT_FOUND;
02774   }
02775 
02776   if (err != DB_SUCCESS)
02777   {
02778     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02779   }
02780 
02781   tuple= ib_tuple_clear(tuple);
02782   ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02783                             getTable(),
02784                             share->has_hidden_primary_key,
02785                             &hidden_autoinc_pkey_position,
02786                             (allocate_blobs)? &blobroot : NULL);
02787   if (ret == 0)
02788     getTable()->status= 0;
02789   else
02790     getTable()->status= STATUS_NOT_FOUND;
02791 
02792   advance_cursor= true;
02793 
02794   return ret;
02795 }
02796 
02797 int HailDBCursor::index_read(unsigned char *buf,
02798                                      const unsigned char *key_ptr,
02799                                      uint32_t key_len,
02800                                      drizzled::ha_rkey_function find_flag)
02801 {
02802   return haildb_index_read(buf, key_ptr, key_len, find_flag, false);
02803 }
02804 
02805 /* This is straight from cursor.cc, but it's private there :( */
02806 uint32_t HailDBCursor::calculate_key_len(uint32_t key_position,
02807                                                  key_part_map keypart_map_arg)
02808 {
02809   /* works only with key prefixes */
02810   assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
02811 
02812   KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
02813   KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
02814   uint32_t length= 0;
02815 
02816   while (key_part_found < end_key_part_found && keypart_map_arg)
02817   {
02818     length+= key_part_found->store_length;
02819     keypart_map_arg >>= 1;
02820     key_part_found++;
02821   }
02822   return length;
02823 }
02824 
02825 
02826 int HailDBCursor::haildb_index_read_map(unsigned char * buf,
02827                                                 const unsigned char *key,
02828                                                 key_part_map keypart_map,
02829                                                 enum ha_rkey_function find_flag,
02830                                                 bool allocate_blobs)
02831 {
02832   uint32_t key_len= calculate_key_len(active_index, keypart_map);
02833   return  haildb_index_read(buf, key, key_len, find_flag, allocate_blobs);
02834 }
02835 
02836 int HailDBCursor::index_read_idx_map(unsigned char * buf,
02837                                              uint32_t index,
02838                                              const unsigned char * key,
02839                                              key_part_map keypart_map,
02840                                              enum ha_rkey_function find_flag)
02841 {
02842   int error, error1;
02843   error= doStartIndexScan(index, 0);
02844   if (!error)
02845   {
02846     error= haildb_index_read_map(buf, key, keypart_map, find_flag, true);
02847     error1= doEndIndexScan();
02848   }
02849   return error ?  error : error1;
02850 }
02851 
02852 int HailDBCursor::reset()
02853 {
02854   if (blobroot)
02855     blobroot->free_root(MYF(0));
02856 
02857   return 0;
02858 }
02859 
02860 int HailDBCursor::analyze(Session*)
02861 {
02862   ib_err_t err;
02863 
02864   err= ib_update_table_statistics(cursor);
02865 
02866   return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02867 }
02868 
02869 int HailDBCursor::index_next(unsigned char *buf)
02870 {
02871   int ret= HA_ERR_END_OF_FILE;
02872 
02873   if (advance_cursor)
02874   {
02875     ib_err_t err= ib_cursor_next(cursor);
02876     if (err == DB_END_OF_INDEX)
02877       return HA_ERR_END_OF_FILE;
02878   }
02879 
02880   tuple= ib_tuple_clear(tuple);
02881   ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02882                             getTable(),
02883                             share->has_hidden_primary_key,
02884                             &hidden_autoinc_pkey_position);
02885 
02886   advance_cursor= true;
02887   return ret;
02888 }
02889 
02890 int HailDBCursor::doEndIndexScan()
02891 {
02892   active_index= MAX_KEY;
02893 
02894   return doEndTableScan();
02895 }
02896 
02897 int HailDBCursor::index_prev(unsigned char *buf)
02898 {
02899   int ret= HA_ERR_END_OF_FILE;
02900   ib_err_t err;
02901 
02902   if (advance_cursor)
02903   {
02904     err= ib_cursor_prev(cursor);
02905     if (err != DB_SUCCESS)
02906     {
02907       if (err == DB_END_OF_INDEX)
02908         return HA_ERR_END_OF_FILE;
02909       else
02910         return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02911     }
02912   }
02913 
02914   tuple= ib_tuple_clear(tuple);
02915   ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02916                             getTable(),
02917                             share->has_hidden_primary_key,
02918                             &hidden_autoinc_pkey_position);
02919 
02920   advance_cursor= true;
02921 
02922   return ret;
02923 }
02924 
02925 
02926 int HailDBCursor::index_first(unsigned char *buf)
02927 {
02928   int ret= HA_ERR_END_OF_FILE;
02929   ib_err_t err;
02930 
02931   err= ib_cursor_first(cursor);
02932   if (err != DB_SUCCESS)
02933     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02934 
02935   tuple= ib_tuple_clear(tuple);
02936   ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02937                             getTable(),
02938                             share->has_hidden_primary_key,
02939                             &hidden_autoinc_pkey_position);
02940 
02941   advance_cursor= true;
02942 
02943   return ret;
02944 }
02945 
02946 
02947 int HailDBCursor::index_last(unsigned char *buf)
02948 {
02949   int ret= HA_ERR_END_OF_FILE;
02950   ib_err_t err;
02951 
02952   err= ib_cursor_last(cursor);
02953   if (err != DB_SUCCESS)
02954     return ib_err_t_to_drizzle_error(getTable()->getSession(), err);
02955 
02956   tuple= ib_tuple_clear(tuple);
02957   ret= read_row_from_haildb(getTable()->getSession(), buf, cursor, tuple,
02958                             getTable(),
02959                             share->has_hidden_primary_key,
02960                             &hidden_autoinc_pkey_position);
02961   advance_cursor= true;
02962 
02963   return ret;
02964 }
02965 
02966 int HailDBCursor::extra(enum ha_extra_function operation)
02967 {
02968   switch (operation)
02969   {
02970   case HA_EXTRA_FLUSH:
02971     if (blobroot)
02972       blobroot->free_root(MYF(0));
02973     break;
02974   case HA_EXTRA_WRITE_CAN_REPLACE:
02975     write_can_replace= true;
02976     break;
02977   case HA_EXTRA_WRITE_CANNOT_REPLACE:
02978     write_can_replace= false;
02979     break;
02980   default:
02981     break;
02982   }
02983 
02984   return 0;
02985 }
02986 
02987 static int create_table_message_table()
02988 {
02989   ib_tbl_sch_t schema;
02990   ib_idx_sch_t index_schema;
02991   ib_trx_t transaction;
02992   ib_id_t table_id;
02993   ib_err_t err, rollback_err;
02994   ib_bool_t create_db_err;
02995 
02996   create_db_err= ib_database_create("data_dictionary");
02997   if (create_db_err != IB_TRUE)
02998     return -1;
02999 
03000   err= ib_table_schema_create(HAILDB_TABLE_DEFINITIONS_TABLE, &schema,
03001                               IB_TBL_COMPACT, 0);
03002   if (err != DB_SUCCESS)
03003     return err;
03004 
03005   err= ib_table_schema_add_col(schema, "table_name", IB_VARCHAR, IB_COL_NONE, 0,
03006                                IB_MAX_TABLE_NAME_LEN);
03007   if (err != DB_SUCCESS)
03008     goto free_err;
03009 
03010   err= ib_table_schema_add_col(schema, "message", IB_BLOB, IB_COL_NONE, 0, 0);
03011   if (err != DB_SUCCESS)
03012     goto free_err;
03013 
03014   err= ib_table_schema_add_index(schema, "PRIMARY_KEY", &index_schema);
03015   if (err != DB_SUCCESS)
03016     goto free_err;
03017 
03018   err= ib_index_schema_add_col(index_schema, "table_name", 0);
03019   if (err != DB_SUCCESS)
03020     goto free_err;
03021   err= ib_index_schema_set_clustered(index_schema);
03022   if (err != DB_SUCCESS)
03023     goto free_err;
03024 
03025   transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
03026   err= ib_schema_lock_exclusive(transaction);
03027   if (err != DB_SUCCESS)
03028     goto rollback;
03029 
03030   err= ib_table_create(transaction, schema, &table_id);
03031   if (err != DB_SUCCESS)
03032     goto rollback;
03033 
03034   err= ib_trx_commit(transaction);
03035   if (err != DB_SUCCESS)
03036     goto rollback;
03037 
03038   ib_table_schema_delete(schema);
03039 
03040   return 0;
03041 rollback:
03042   ib_schema_unlock(transaction);
03043   rollback_err= ib_trx_rollback(transaction);
03044   assert(rollback_err == DB_SUCCESS);
03045 free_err:
03046   ib_table_schema_delete(schema);
03047   return err;
03048 }
03049 
03050 static bool innobase_use_doublewrite= true;
03051 static bool srv_file_per_table= false;
03052 static bool innobase_adaptive_hash_index;
03053 static bool srv_adaptive_flushing;
03054 static bool innobase_print_verbose_log;
03055 static bool innobase_rollback_on_timeout;
03056 static bool innobase_create_status_file;
03057 static bool srv_use_sys_malloc;
03058 static string innobase_file_format_name;
03059 typedef constrained_check<unsigned int, 1000, 1> autoextend_constraint;
03060 static autoextend_constraint srv_auto_extend_increment;
03061 typedef constrained_check<size_t, SIZE_MAX, 5242880, 1048576> buffer_pool_constraint;
03062 static buffer_pool_constraint innobase_buffer_pool_size;
03063 typedef constrained_check<size_t, SIZE_MAX, 512, 1024> additional_mem_pool_constraint;
03064 static additional_mem_pool_constraint innobase_additional_mem_pool_size;
03065 static bool  innobase_use_checksums= true;
03066 typedef constrained_check<unsigned int, UINT_MAX, 100> io_capacity_constraint;
03067 typedef constrained_check<uint32_t, 2, 0> trinary_constraint;
03068 static trinary_constraint innobase_fast_shutdown;
03069 static trinary_constraint srv_flush_log_at_trx_commit;
03070 typedef constrained_check<uint32_t, 6, 0> force_recovery_constraint;
03071 static force_recovery_constraint innobase_force_recovery;
03072 typedef constrained_check<int64_t, INT64_MAX, 1024*1024, 1024*1024> log_file_constraint;
03073 static log_file_constraint haildb_log_file_size;
03074 
03075 static io_capacity_constraint srv_io_capacity;
03076 typedef constrained_check<unsigned int, 100, 2> log_files_in_group_constraint;
03077 static log_files_in_group_constraint haildb_log_files_in_group;
03078 typedef constrained_check<unsigned int, 1024*1024*1024, 1> lock_wait_constraint;
03079 static lock_wait_constraint innobase_lock_wait_timeout;
03080 typedef constrained_check<long, LONG_MAX, 256*1024, 1024> log_buffer_size_constraint;
03081 static log_buffer_size_constraint innobase_log_buffer_size;
03082 typedef constrained_check<unsigned int, 97, 5> lru_old_blocks_constraint;
03083 static lru_old_blocks_constraint innobase_lru_old_blocks_pct;
03084 typedef constrained_check<unsigned int, 99, 0> max_dirty_pages_constraint;
03085 static max_dirty_pages_constraint haildb_max_dirty_pages_pct;
03086 static uint64_constraint haildb_max_purge_lag;
03087 static uint64_constraint haildb_sync_spin_loops;
03088 typedef constrained_check<uint32_t, UINT32_MAX, 10> open_files_constraint;
03089 static open_files_constraint haildb_open_files;
03090 typedef constrained_check<unsigned int, 64, 1> io_threads_constraint;
03091 static io_threads_constraint haildb_read_io_threads;
03092 static io_threads_constraint haildb_write_io_threads;
03093 
03094 
03095 static uint32_t innobase_lru_block_access_recency;
03096 
03097 
03098 
03099 static int haildb_file_format_name_validate(Session*, set_var *var)
03100 {
03101 
03102   const char *format= var->value->str_value.ptr();
03103   if (format == NULL)
03104     return 1;
03105 
03106   ib_err_t err= ib_cfg_set_text("file_format", format);
03107 
03108   if (err == DB_SUCCESS)
03109   {
03110     innobase_file_format_name= format;
03111     return 0;
03112   }
03113   else
03114     return 1;
03115 }
03116 
03117 static void haildb_lru_old_blocks_pct_update(Session*, sql_var_t)
03118 {
03119   int ret= ib_cfg_set_int("lru_old_blocks_pct", static_cast<uint32_t>(innobase_lru_old_blocks_pct));
03120   (void)ret;
03121 }
03122 
03123 static void haildb_lru_block_access_recency_update(Session*, sql_var_t)
03124 {
03125   int ret= ib_cfg_set_int("lru_block_access_recency", static_cast<uint32_t>(innobase_lru_block_access_recency));
03126   (void)ret;
03127 }
03128 
03129 static void haildb_status_file_update(Session*, sql_var_t)
03130 {
03131   ib_err_t err;
03132 
03133   if (innobase_create_status_file)
03134     err= ib_cfg_set_bool_on("status_file");
03135   else
03136     err= ib_cfg_set_bool_off("status_file");
03137   (void)err;
03138 }
03139 
03140 extern "C" int haildb_errmsg_callback(ib_msg_stream_t, const char *fmt, ...);
03141 namespace drizzled
03142 {
03143 extern bool volatile shutdown_in_progress;
03144 }
03145 
03146 extern "C" int haildb_errmsg_callback(ib_msg_stream_t, const char *fmt, ...)
03147 {
03148   bool r= false;
03149   va_list args;
03150   va_start(args, fmt);
03151   if (not shutdown_in_progress)
03152   {
03153     r= plugin::ErrorMessage::vprintf(error::WARN, fmt, args);
03154   }
03155   else
03156   {
03157     vfprintf(stderr, fmt, args);
03158   }
03159   va_end(args);
03160 
03161   return (! r==true);
03162 }
03163 
03164 static int haildb_init(drizzled::module::Context &context)
03165 {
03166   haildb_system_table_names.insert(std::string("HAILDB_SYS_TABLES"));
03167   haildb_system_table_names.insert(std::string("HAILDB_SYS_COLUMNS"));
03168   haildb_system_table_names.insert(std::string("HAILDB_SYS_INDEXES"));
03169   haildb_system_table_names.insert(std::string("HAILDB_SYS_FIELDS"));
03170   haildb_system_table_names.insert(std::string("HAILDB_SYS_FOREIGN"));
03171   haildb_system_table_names.insert(std::string("HAILDB_SYS_FOREIGN_COLS"));
03172 
03173   const module::option_map &vm= context.getOptions();
03174 
03175   /* Inverted Booleans */
03176 
03177   innobase_adaptive_hash_index= (vm.count("disable-adaptive-hash-index")) ? false : true;
03178   srv_adaptive_flushing= (vm.count("disable-adaptive-flushing")) ? false : true;
03179   innobase_use_checksums= (vm.count("disable-checksums")) ? false : true;
03180   innobase_use_doublewrite= (vm.count("disable-doublewrite")) ? false : true;
03181   innobase_print_verbose_log= (vm.count("disable-print-verbose-log")) ? false : true;
03182   srv_use_sys_malloc= (vm.count("use-internal-malloc")) ? false : true;
03183 
03184 
03185   ib_err_t err;
03186 
03187   err= ib_init();
03188   if (err != DB_SUCCESS)
03189     goto haildb_error;
03190 
03191   ib_logger_set(haildb_errmsg_callback, NULL);
03192 
03193   if (not vm["data-home-dir"].as<string>().empty())
03194   {
03195     err= ib_cfg_set_text("data_home_dir", vm["data-home-dir"].as<string>().c_str());
03196     if (err != DB_SUCCESS)
03197       goto haildb_error;
03198   }
03199 
03200   if (vm.count("log-group-home-dir"))
03201   {
03202     err= ib_cfg_set_text("log_group_home_dir", vm["log-group-home-dir"].as<string>().c_str());
03203     if (err != DB_SUCCESS)
03204       goto haildb_error;
03205   }
03206 
03207   if (innobase_print_verbose_log)
03208     err= ib_cfg_set_bool_on("print_verbose_log");
03209   else
03210     err= ib_cfg_set_bool_off("print_verbose_log");
03211 
03212   if (err != DB_SUCCESS)
03213     goto haildb_error;
03214 
03215   if (innobase_rollback_on_timeout)
03216     err= ib_cfg_set_bool_on("rollback_on_timeout");
03217   else
03218     err= ib_cfg_set_bool_off("rollback_on_timeout");
03219 
03220   if (err != DB_SUCCESS)
03221     goto haildb_error;
03222 
03223   if (innobase_use_doublewrite)
03224     err= ib_cfg_set_bool_on("doublewrite");
03225   else
03226     err= ib_cfg_set_bool_off("doublewrite");
03227 
03228   if (err != DB_SUCCESS)
03229     goto haildb_error;
03230 
03231   if (innobase_adaptive_hash_index)
03232     err= ib_cfg_set_bool_on("adaptive_hash_index");
03233   else
03234     err= ib_cfg_set_bool_off("adaptive_hash_index");
03235 
03236   if (err != DB_SUCCESS)
03237     goto haildb_error;
03238 
03239   if (srv_adaptive_flushing)
03240     err= ib_cfg_set_bool_on("adaptive_flushing");
03241   else
03242     err= ib_cfg_set_bool_off("adaptive_flushing");
03243 
03244   if (err != DB_SUCCESS)
03245     goto haildb_error;
03246 
03247   err= ib_cfg_set_int("additional_mem_pool_size", innobase_additional_mem_pool_size.get());
03248   if (err != DB_SUCCESS)
03249     goto haildb_error;
03250 
03251   err= ib_cfg_set_int("autoextend_increment", srv_auto_extend_increment.get());
03252   if (err != DB_SUCCESS)
03253     goto haildb_error;
03254 
03255   err= ib_cfg_set_int("buffer_pool_size", innobase_buffer_pool_size.get());
03256   if (err != DB_SUCCESS)
03257     goto haildb_error;
03258 
03259   err= ib_cfg_set_int("io_capacity", srv_io_capacity.get());
03260   if (err != DB_SUCCESS)
03261     goto haildb_error;
03262 
03263   if (srv_file_per_table)
03264     err= ib_cfg_set_bool_on("file_per_table");
03265   else
03266     err= ib_cfg_set_bool_off("file_per_table");
03267 
03268   if (err != DB_SUCCESS)
03269     goto haildb_error;
03270 
03271   err= ib_cfg_set_int("flush_log_at_trx_commit",
03272                       srv_flush_log_at_trx_commit.get());
03273   if (err != DB_SUCCESS)
03274     goto haildb_error;
03275 
03276   if (vm.count("flush-method") != 0)
03277   {
03278     err= ib_cfg_set_text("flush_method", 
03279                          vm["flush-method"].as<string>().c_str());
03280     if (err != DB_SUCCESS)
03281       goto haildb_error;
03282   }
03283 
03284   err= ib_cfg_set_int("force_recovery",
03285                       innobase_force_recovery.get());
03286   if (err != DB_SUCCESS)
03287     goto haildb_error;
03288 
03289   err= ib_cfg_set_text("data_file_path", vm["data-file-path"].as<string>().c_str());
03290   if (err != DB_SUCCESS)
03291     goto haildb_error;
03292 
03293   err= ib_cfg_set_int("log_file_size", haildb_log_file_size.get());
03294   if (err != DB_SUCCESS)
03295     goto haildb_error;
03296 
03297   err= ib_cfg_set_int("log_buffer_size", innobase_log_buffer_size.get());
03298   if (err != DB_SUCCESS)
03299     goto haildb_error;
03300 
03301   err= ib_cfg_set_int("log_files_in_group", haildb_log_files_in_group.get());
03302   if (err != DB_SUCCESS)
03303     goto haildb_error;
03304 
03305   err= ib_cfg_set_int("checksums", innobase_use_checksums);
03306   if (err != DB_SUCCESS)
03307     goto haildb_error;
03308 
03309   err= ib_cfg_set_int("lock_wait_timeout", innobase_lock_wait_timeout.get());
03310   if (err != DB_SUCCESS)
03311     goto haildb_error;
03312 
03313   err= ib_cfg_set_int("max_dirty_pages_pct", haildb_max_dirty_pages_pct.get());
03314   if (err != DB_SUCCESS)
03315     goto haildb_error;
03316 
03317   err= ib_cfg_set_int("max_purge_lag", haildb_max_purge_lag.get());
03318   if (err != DB_SUCCESS)
03319     goto haildb_error;
03320 
03321   err= ib_cfg_set_int("open_files", haildb_open_files.get());
03322   if (err != DB_SUCCESS)
03323     goto haildb_error;
03324 
03325   err= ib_cfg_set_int("read_io_threads", haildb_read_io_threads.get());
03326   if (err != DB_SUCCESS)
03327     goto haildb_error;
03328 
03329   err= ib_cfg_set_int("write_io_threads", haildb_write_io_threads.get());
03330   if (err != DB_SUCCESS)
03331     goto haildb_error;
03332 
03333   err= ib_cfg_set_int("sync_spin_loops", haildb_sync_spin_loops.get());
03334   if (err != DB_SUCCESS)
03335     goto haildb_error;
03336 
03337   if (srv_use_sys_malloc)
03338     err= ib_cfg_set_bool_on("use_sys_malloc");
03339   else
03340     err= ib_cfg_set_bool_off("use_sys_malloc");
03341 
03342   if (err != DB_SUCCESS)
03343     goto haildb_error;
03344 
03345   err= ib_startup(innobase_file_format_name.c_str());
03346   if (err != DB_SUCCESS)
03347     goto haildb_error;
03348 
03349   create_table_message_table();
03350 
03351   haildb_engine= new HailDBEngine("InnoDB");
03352   context.add(haildb_engine);
03353   context.registerVariable(new sys_var_bool_ptr_readonly("adaptive_hash_index",
03354                                                          &innobase_adaptive_hash_index));
03355   context.registerVariable(new sys_var_bool_ptr_readonly("adaptive_flushing",
03356                                                          &srv_adaptive_flushing));
03357   context.registerVariable(new sys_var_constrained_value_readonly<size_t>("additional_mem_pool_size",innobase_additional_mem_pool_size));
03358   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("autoextend_increment", srv_auto_extend_increment));
03359   context.registerVariable(new sys_var_constrained_value_readonly<size_t>("buffer_pool_size", innobase_buffer_pool_size));
03360   context.registerVariable(new sys_var_bool_ptr_readonly("checksums",
03361                                                          &innobase_use_checksums));
03362   context.registerVariable(new sys_var_bool_ptr_readonly("doublewrite",
03363                                                          &innobase_use_doublewrite));
03364   context.registerVariable(new sys_var_const_string_val("data_file_path",
03365                                                 vm["data-file-path"].as<string>()));
03366   context.registerVariable(new sys_var_const_string_val("data_home_dir",
03367                                                 vm["data-home-dir"].as<string>()));
03368   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("io_capacity", srv_io_capacity));
03369   context.registerVariable(new sys_var_constrained_value_readonly<uint32_t>("fast_shutdown", innobase_fast_shutdown));
03370   context.registerVariable(new sys_var_bool_ptr_readonly("file_per_table",
03371                                                          &srv_file_per_table));
03372   context.registerVariable(new sys_var_bool_ptr_readonly("rollback_on_timeout",
03373                                                          &innobase_rollback_on_timeout));
03374   context.registerVariable(new sys_var_bool_ptr_readonly("print_verbose_log",
03375                                                          &innobase_print_verbose_log));
03376   context.registerVariable(new sys_var_bool_ptr("status_file",
03377                                                 &innobase_create_status_file,
03378                                                 haildb_status_file_update));
03379   context.registerVariable(new sys_var_bool_ptr_readonly("use_sys_malloc",
03380                                                          &srv_use_sys_malloc));
03381   context.registerVariable(new sys_var_std_string("file_format",
03382                                                   innobase_file_format_name,
03383                                                   haildb_file_format_name_validate));
03384   context.registerVariable(new sys_var_constrained_value_readonly<uint32_t>("flush_log_at_trx_commit", srv_flush_log_at_trx_commit));
03385   context.registerVariable(new sys_var_const_string_val("flush_method",
03386                                                 vm.count("flush-method") ?  vm["flush-method"].as<string>() : ""));
03387   context.registerVariable(new sys_var_constrained_value_readonly<uint32_t>("force_recovery", innobase_force_recovery));
03388   context.registerVariable(new sys_var_const_string_val("log_group_home_dir",
03389                                                 vm.count("log-group-home-dir") ?  vm["log-group-home-dir"].as<string>() : ""));
03390   context.registerVariable(new sys_var_constrained_value<int64_t>("log_file_size", haildb_log_file_size));
03391   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("log_files_in_group", haildb_log_files_in_group));
03392   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("lock_wait_timeout", innobase_lock_wait_timeout));
03393   context.registerVariable(new sys_var_constrained_value_readonly<long>("log_buffer_size", innobase_log_buffer_size));
03394   context.registerVariable(new sys_var_constrained_value<unsigned int>("lru_old_blocks_pct", innobase_lru_old_blocks_pct, haildb_lru_old_blocks_pct_update));
03395   context.registerVariable(new sys_var_uint32_t_ptr("lru_block_access_recency",
03396                                                     &innobase_lru_block_access_recency,
03397                                                     haildb_lru_block_access_recency_update));
03398   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("max_dirty_pages_pct", haildb_max_dirty_pages_pct));
03399   context.registerVariable(new sys_var_constrained_value_readonly<uint64_t>("max_purge_lag", haildb_max_purge_lag));
03400   context.registerVariable(new sys_var_constrained_value_readonly<uint64_t>("sync_spin_loops", haildb_sync_spin_loops));
03401   context.registerVariable(new sys_var_constrained_value_readonly<uint32_t>("open_files", haildb_open_files));
03402   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("read_io_threads", haildb_read_io_threads));
03403   context.registerVariable(new sys_var_constrained_value_readonly<unsigned int>("write_io_threads", haildb_write_io_threads));
03404 
03405   haildb_datadict_dump_func_initialize(context);
03406   config_table_function_initialize(context);
03407   status_table_function_initialize(context);
03408 
03409   return 0;
03410 haildb_error:
03411   fprintf(stderr, _("Error starting HailDB %d (%s)\n"),
03412           err, ib_strerror(err));
03413   return -1;
03414 }
03415 
03416 
03417 HailDBEngine::~HailDBEngine()
03418 {
03419   ib_err_t err;
03420   ib_shutdown_t shutdown_flag= IB_SHUTDOWN_NORMAL;
03421 
03422   if (innobase_fast_shutdown.get() == 1)
03423     shutdown_flag= IB_SHUTDOWN_NO_IBUFMERGE_PURGE;
03424   else if (innobase_fast_shutdown.get() == 2)
03425     shutdown_flag= IB_SHUTDOWN_NO_BUFPOOL_FLUSH;
03426 
03427   err= ib_shutdown(shutdown_flag);
03428 
03429   if (err != DB_SUCCESS)
03430   {
03431     fprintf(stderr,"Error %d shutting down HailDB!\n", err);
03432   }
03433 
03434 }
03435 
03436 
03437 static void init_options(drizzled::module::option_context &context)
03438 {
03439   context("disable-adaptive-hash-index",
03440           N_("Disable HailDB adaptive hash index (enabled by default)."));
03441   context("disable-adaptive-flushing",
03442           N_("Do not attempt to flush dirty pages to avoid IO bursts at checkpoints."));
03443   context("additional-mem-pool-size",
03444           po::value<additional_mem_pool_constraint>(&innobase_additional_mem_pool_size)->default_value(8*1024*1024L),
03445           N_("Size of a memory pool HailDB uses to store data dictionary information and other internal data structures."));
03446   context("autoextend-increment",
03447           po::value<autoextend_constraint>(&srv_auto_extend_increment)->default_value(8),
03448           N_("Data file autoextend increment in megabytes"));
03449   context("buffer-pool-size",
03450           po::value<buffer_pool_constraint>(&innobase_buffer_pool_size)->default_value(128*1024*1024L),
03451           N_("The size of the memory buffer HailDB uses to cache data and indexes of its tables."));
03452   context("data-home-dir",
03453           po::value<string>()->default_value(""),
03454           N_("The common part for HailDB table spaces."));
03455   context("disable-checksums",
03456           N_("Disable HailDB checksums validation (enabled by default)."));
03457   context("disable-doublewrite",
03458           N_("Disable HailDB doublewrite buffer (enabled by default)."));
03459   context("io-capacity",
03460           po::value<io_capacity_constraint>(&srv_io_capacity)->default_value(200),
03461           N_("Number of IOPs the server can do. Tunes the background IO rate"));
03462   context("fast-shutdown",
03463           po::value<trinary_constraint>(&innobase_fast_shutdown)->default_value(1),
03464           N_("Speeds up the shutdown process of the HailDB storage engine. Possible values are 0, 1 (faster) or 2 (fastest - crash-like)."));
03465   context("file-per-table",
03466           po::value<bool>(&srv_file_per_table)->default_value(false)->zero_tokens(),
03467           N_("Stores each HailDB table to an .ibd file in the database dir."));
03468   context("file-format",
03469           po::value<string>(&innobase_file_format_name)->default_value("Barracuda"),
03470           N_("File format to use for new tables in .ibd files."));
03471   context("flush-log-at-trx-commit",
03472           po::value<trinary_constraint>(&srv_flush_log_at_trx_commit)->default_value(1),
03473           N_("Set to 0 (write and flush once per second),1 (write and flush at each commit) or 2 (write at commit, flush once per second)."));
03474   context("flush-method",
03475           po::value<string>(),
03476           N_("With which method to flush data."));
03477   context("force-recovery",
03478           po::value<force_recovery_constraint>(&innobase_force_recovery)->default_value(0),
03479           N_("Helps to save your data in case the disk image of the database becomes corrupt."));
03480   context("data-file-path",
03481           po::value<string>()->default_value("ibdata1:10M:autoextend"),
03482           N_("Path to individual files and their sizes."));
03483   context("log-group-home-dir",
03484           po::value<string>(),
03485           N_("Path to HailDB log files."));
03486   context("log-file-size",
03487           po::value<log_file_constraint>(&haildb_log_file_size)->default_value(20*1024*1024L),
03488           N_("Size of each log file in a log group."));
03489   context("haildb-log-files-in-group",
03490           po::value<log_files_in_group_constraint>(&haildb_log_files_in_group)->default_value(2),
03491           N_("Number of log files in the log group. HailDB writes to the files in a circular fashion. Value 3 is recommended here."));
03492   context("lock-wait-timeout",
03493           po::value<lock_wait_constraint>(&innobase_lock_wait_timeout)->default_value(5),
03494           N_("Timeout in seconds an HailDB transaction may wait for a lock before being rolled back. Values above 100000000 disable the timeout."));
03495   context("log-buffer-size",
03496         po::value<log_buffer_size_constraint>(&innobase_log_buffer_size)->default_value(8*1024*1024L),
03497         N_("The size of the buffer which HailDB uses to write log to the log files on disk."));
03498   context("lru-old-blocks-pct",
03499           po::value<lru_old_blocks_constraint>(&innobase_lru_old_blocks_pct)->default_value(37),
03500           N_("Sets the point in the LRU list from where all pages are classified as old (Advanced users)"));
03501   context("lru-block-access-recency",
03502           po::value<uint32_t>(&innobase_lru_block_access_recency)->default_value(0),
03503           N_("Milliseconds between accesses to a block at which it is made young. 0=disabled (Advanced users)"));
03504   context("max-dirty-pages-pct",
03505           po::value<max_dirty_pages_constraint>(&haildb_max_dirty_pages_pct)->default_value(75),
03506           N_("Percentage of dirty pages allowed in bufferpool."));
03507   context("max-purge-lag",
03508           po::value<uint64_constraint>(&haildb_max_purge_lag)->default_value(0),
03509           N_("Desired maximum length of the purge queue (0 = no limit)"));
03510   context("rollback-on-timeout",
03511           po::value<bool>(&innobase_rollback_on_timeout)->default_value(false)->zero_tokens(),
03512           N_("Roll back the complete transaction on lock wait timeout, for 4.x compatibility (disabled by default)"));
03513   context("open-files",
03514           po::value<open_files_constraint>(&haildb_open_files)->default_value(300),
03515           N_("How many files at the maximum HailDB keeps open at the same time."));
03516   context("read-io-threads",
03517           po::value<io_threads_constraint>(&haildb_read_io_threads)->default_value(4),
03518           N_("Number of background read I/O threads in HailDB."));
03519   context("write-io-threads",
03520           po::value<io_threads_constraint>(&haildb_write_io_threads)->default_value(4),
03521           N_("Number of background write I/O threads in HailDB."));
03522   context("disable-print-verbose-log",
03523           N_("Disable if you want to reduce the number of messages written to the log (default: enabled)."));
03524   context("status-file",
03525           po::value<bool>(&innobase_create_status_file)->default_value(false)->zero_tokens(),
03526           N_("Enable SHOW HAILDB STATUS output in the log"));
03527   context("sync-spin-loops",
03528           po::value<uint64_constraint>(&haildb_sync_spin_loops)->default_value(30L),
03529           N_("Count of spin-loop rounds in HailDB mutexes (30 by default)"));
03530   context("use-internal-malloc",
03531           N_("Use HailDB's internal memory allocator instead of the OS memory allocator"));
03532 }
03533 
03534 DRIZZLE_DECLARE_PLUGIN
03535 {
03536   DRIZZLE_VERSION_ID,
03537   "INNODB",
03538   "1.0",
03539   "Stewart Smith",
03540   "Transactional Storage Engine using the HailDB Library",
03541   PLUGIN_LICENSE_GPL,
03542   haildb_init,     /* Plugin Init */
03543   NULL, /* depends */
03544   init_options                /* config options   */
03545 }
03546 DRIZZLE_DECLARE_PLUGIN_END;