Drizzled Public API Documentation

session.cc
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008 Sun Microsystems, Inc.
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; version 2 of the License.
00009  *
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018  */
00019 
00024 #include <config.h>
00025 
00026 #include <drizzled/copy_field.h>
00027 #include <drizzled/data_home.h>
00028 #include <drizzled/display.h>
00029 #include <drizzled/drizzled.h>
00030 #include <drizzled/error.h>
00031 #include <drizzled/gettext.h>
00032 #include <drizzled/identifier.h>
00033 #include <drizzled/internal/iocache.h>
00034 #include <drizzled/internal/thread_var.h>
00035 #include <drizzled/internal_error_handler.h>
00036 #include <drizzled/item/cache.h>
00037 #include <drizzled/item/empty_string.h>
00038 #include <drizzled/item/float.h>
00039 #include <drizzled/item/return_int.h>
00040 #include <drizzled/lock.h>
00041 #include <drizzled/plugin/authentication.h>
00042 #include <drizzled/plugin/client.h>
00043 #include <drizzled/plugin/event_observer.h>
00044 #include <drizzled/plugin/logging.h>
00045 #include <drizzled/plugin/query_rewrite.h>
00046 #include <drizzled/plugin/scheduler.h>
00047 #include <drizzled/plugin/transactional_storage_engine.h>
00048 #include <drizzled/probes.h>
00049 #include <drizzled/pthread_globals.h>
00050 #include <drizzled/query_id.h>
00051 #include <drizzled/refresh_version.h>
00052 #include <drizzled/select_dump.h>
00053 #include <drizzled/select_exists_subselect.h>
00054 #include <drizzled/select_export.h>
00055 #include <drizzled/select_max_min_finder_subselect.h>
00056 #include <drizzled/select_singlerow_subselect.h>
00057 #include <drizzled/select_subselect.h>
00058 #include <drizzled/select_to_file.h>
00059 #include <drizzled/session.h>
00060 #include <drizzled/session/cache.h>
00061 #include <drizzled/show.h>
00062 #include <drizzled/sql_base.h>
00063 #include <drizzled/table/singular.h>
00064 #include <drizzled/table_proto.h>
00065 #include <drizzled/tmp_table_param.h>
00066 #include <drizzled/transaction_services.h>
00067 #include <drizzled/user_var_entry.h>
00068 #include <drizzled/util/functors.h>
00069 #include <drizzled/util/find_ptr.h>
00070 #include <plugin/myisam/myisam.h>
00071 #include <drizzled/item/subselect.h>
00072 #include <drizzled/statement.h>
00073 #include <drizzled/sql_lex.h>
00074 
00075 #include <algorithm>
00076 #include <climits>
00077 #include <fcntl.h>
00078 #include <sys/stat.h>
00079 
00080 #include <boost/filesystem.hpp>
00081 #include <boost/checked_delete.hpp>
00082 
00083 #include <drizzled/util/backtrace.h>
00084 
00085 #include <drizzled/schema.h>
00086 
00087 using namespace std;
00088 
00089 namespace fs=boost::filesystem;
00090 namespace drizzled
00091 {
00092 
00093 /*
00094   The following is used to initialise Table_ident with a internal
00095   table name
00096 */
00097 char internal_table_name[2]= "*";
00098 char empty_c_string[1]= {0};    /* used for not defined db */
00099 
00100 const char * const Session::DEFAULT_WHERE= "field list";
00101 
00102 bool Key_part_spec::operator==(const Key_part_spec& other) const
00103 {
00104   return length == other.length &&
00105          field_name.length == other.field_name.length &&
00106     !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
00107 }
00108 
00109 Open_tables_state::Open_tables_state(uint64_t version_arg) :
00110   version(version_arg)
00111 {
00112   open_tables= temporary_tables= derived_tables= NULL;
00113   extra_lock= lock= NULL;
00114 }
00115 
00116 /*
00117   The following functions form part of the C plugin API
00118 */
00119 int tmpfile(const char *prefix)
00120 {
00121   char filename[FN_REFLEN];
00122   int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
00123   if (fd >= 0) {
00124     unlink(filename);
00125   }
00126 
00127   return fd;
00128 }
00129 
00130 void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
00131 {
00132   return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
00133 }
00134 
00135 ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
00136                                              size_t index)
00137 {
00138   return &ha_data[monitored->getId()].resource_context[index];
00139 }
00140 
00141 int64_t session_test_options(const Session *session, int64_t test_options)
00142 {
00143   return session->options & test_options;
00144 }
00145 
00146 class Session::impl_c
00147 {
00148 public:
00155   LEX lex;
00156 };
00157 
00158 Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
00159   Open_tables_state(refresh_version),
00160   mem_root(&main_mem_root),
00161   query(new std::string),
00162   _schema(new std::string),
00163   client(client_arg),
00164   scheduler(NULL),
00165   scheduler_arg(NULL),
00166   lock_id(&main_lock_id),
00167   thread_stack(NULL),
00168   security_ctx(identifier::User::make_shared()),
00169   _where(Session::DEFAULT_WHERE),
00170   dbug_sentry(Session_SENTRY_MAGIC),
00171   mysys_var(0),
00172   command(COM_CONNECT),
00173   file_id(0),
00174   _epoch(boost::gregorian::date(1970,1,1)),
00175   _connect_time(boost::posix_time::microsec_clock::universal_time()),
00176   utime_after_lock(0),
00177   ha_data(plugin::num_trx_monitored_objects),
00178   query_id(0),
00179   warn_query_id(0),
00180   first_successful_insert_id_in_prev_stmt(0),
00181   first_successful_insert_id_in_cur_stmt(0),
00182   limit_found_rows(0),
00183   options(session_startup_options),
00184   row_count_func(-1),
00185   sent_row_count(0),
00186   examined_row_count(0),
00187   used_tables(0),
00188   total_warn_count(0),
00189   col_access(0),
00190   statement_id_counter(0),
00191   row_count(0),
00192   thread_id(0),
00193   tmp_table(0),
00194   _global_read_lock(NONE),
00195   count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
00196   _killed(NOT_KILLED),
00197   some_tables_deleted(false),
00198   no_errors(false),
00199   password(false),
00200   is_fatal_error(false),
00201   transaction_rollback_request(false),
00202   is_fatal_sub_stmt_error(0),
00203   derived_tables_processing(false),
00204   m_lip(NULL),
00205   cached_table(0),
00206   arg_of_last_insert_id_function(false),
00207   impl_(new impl_c),
00208   _catalog(catalog_arg),
00209   transaction_message(NULL),
00210   statement_message(NULL),
00211   session_event_observers(NULL),
00212   xa_id(0),
00213   concurrent_execute_allowed(true),
00214   tablespace_op(false),
00215   use_usage(false)
00216 {
00217   client->setSession(this);
00218 
00219   /*
00220     Pass nominal parameters to init_alloc_root only to ensure that
00221     the destructor works OK in case of an error. The main_mem_root
00222     will be re-initialized in init_for_queries().
00223   */
00224   memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
00225   cuted_fields= sent_row_count= row_count= 0L;
00226   // Must be reset to handle error with Session's created for init of mysqld
00227   lex().current_select= 0;
00228   memset(&variables, 0, sizeof(variables));
00229   scoreboard_index= -1;
00230   cleanup_done= abort_on_warning= no_warnings_for_error= false;
00231 
00232   /* query_cache init */
00233   query_cache_key= "";
00234   resultset= NULL;
00235 
00236   /* Variables with default values */
00237   proc_info="login";
00238 
00239   plugin_sessionvar_init(this);
00240   /*
00241     variables= global_system_variables above has reset
00242     variables.pseudo_thread_id to 0. We need to correct it here to
00243     avoid temporary tables replication failure.
00244   */
00245   variables.pseudo_thread_id= thread_id;
00246   server_status= SERVER_STATUS_AUTOCOMMIT;
00247 
00248   if (variables.max_join_size == HA_POS_ERROR)
00249     options |= OPTION_BIG_SELECTS;
00250   else
00251     options &= ~OPTION_BIG_SELECTS;
00252 
00253   open_options=ha_open_options;
00254   update_lock_default= TL_WRITE;
00255   session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
00256   warn_list.clear();
00257   memset(warn_count, 0, sizeof(warn_count));
00258   memset(&status_var, 0, sizeof(status_var));
00259 
00260   /* Initialize sub structures */
00261   memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
00262 
00263   substitute_null_with_insert_id = false;
00264   lock_info.init(); /* safety: will be reset after start */
00265   thr_lock_owner_init(&main_lock_id, &lock_info);
00266 
00267   m_internal_handler= NULL;
00268 
00269   plugin::EventObserver::registerSessionEvents(*this);
00270 }
00271 
00272 const LEX& Session::lex() const
00273 {
00274   return impl_->lex;
00275 }
00276 
00277 LEX& Session::lex()
00278 {
00279   return impl_->lex;
00280 }
00281 
00282 enum_sql_command Session::getSqlCommand() const
00283 {
00284   return lex().sql_command;
00285 }
00286 
00287 void statement::Statement::set_command(enum_sql_command v)
00288 {
00289   session().lex().sql_command= v;
00290 }
00291 
00292 LEX& statement::Statement::lex()
00293 {
00294   return session().lex();
00295 }
00296 
00297 session::Transactions& statement::Statement::transaction()
00298 {
00299   return session().transaction;
00300 }
00301 
00302 bool Session::add_item_to_list(Item *item)
00303 {
00304   return lex().current_select->add_item_to_list(this, item);
00305 }
00306 
00307 bool Session::add_value_to_list(Item *value)
00308 {
00309   return lex().value_list.push_back(value);
00310 }
00311 
00312 bool Session::add_order_to_list(Item *item, bool asc)
00313 {
00314   return lex().current_select->add_order_to_list(this, item, asc);
00315 }
00316 
00317 bool Session::add_group_to_list(Item *item, bool asc)
00318 {
00319   return lex().current_select->add_group_to_list(this, item, asc);
00320 }
00321 
00322 void Session::free_items()
00323 {
00324   Item *next;
00325   /* This works because items are allocated with memory::sql_alloc() */
00326   for (; free_list; free_list= next)
00327   {
00328     next= free_list->next;
00329     free_list->delete_self();
00330   }
00331 }
00332 
00333 void Session::push_internal_handler(Internal_error_handler *handler)
00334 {
00335   /*
00336     TODO: The current implementation is limited to 1 handler at a time only.
00337     Session and sp_rcontext need to be modified to use a common handler stack.
00338   */
00339   assert(m_internal_handler == NULL);
00340   m_internal_handler= handler;
00341 }
00342 
00343 bool Session::handle_error(drizzled::error_t sql_errno, const char *message,
00344                            DRIZZLE_ERROR::enum_warning_level level)
00345 {
00346   if (m_internal_handler)
00347   {
00348     return m_internal_handler->handle_error(sql_errno, message, level, this);
00349   }
00350 
00351   return false;                                 // 'false', as per coding style
00352 }
00353 
00354 void Session::setAbort(bool arg)
00355 {
00356   mysys_var->abort= arg;
00357 }
00358 
00359 void Session::lockOnSys()
00360 {
00361   if (not mysys_var)
00362     return;
00363 
00364   setAbort(true);
00365   boost_unique_lock_t scopedLock(mysys_var->mutex);
00366   if (mysys_var->current_cond)
00367   {
00368     mysys_var->current_mutex->lock();
00369     mysys_var->current_cond->notify_all();
00370     mysys_var->current_mutex->unlock();
00371   }
00372 }
00373 
00374 void Session::pop_internal_handler()
00375 {
00376   assert(m_internal_handler != NULL);
00377   m_internal_handler= NULL;
00378 }
00379 
00380 void Session::get_xid(DrizzleXid *xid)
00381 {
00382   *xid = *(DrizzleXid *) &transaction.xid_state.xid;
00383 }
00384 
00385 /* Do operations that may take a long time */
00386 
00387 void Session::cleanup(void)
00388 {
00389   assert(cleanup_done == false);
00390 
00391   setKilled(KILL_CONNECTION);
00392 #ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
00393   if (transaction.xid_state.xa_state == XA_PREPARED)
00394   {
00395 #error xid_state in the cache should be replaced by the allocated value
00396   }
00397 #endif
00398   {
00399     TransactionServices &transaction_services= TransactionServices::singleton();
00400     transaction_services.rollbackTransaction(*this, true);
00401   }
00402 
00403   for (UserVars::iterator iter= user_vars.begin();
00404        iter != user_vars.end();
00405        iter++)
00406   {
00407     user_var_entry *entry= iter->second;
00408     boost::checked_delete(entry);
00409   }
00410   user_vars.clear();
00411 
00412 
00413   close_temporary_tables();
00414 
00415   if (global_read_lock)
00416   {
00417     unlockGlobalReadLock();
00418   }
00419 
00420   cleanup_done= true;
00421 }
00422 
00423 Session::~Session()
00424 {
00425   this->checkSentry();
00426 
00427   if (client and client->isConnected())
00428   {
00429     assert(security_ctx);
00430     if (global_system_variables.log_warnings)
00431     {
00432       errmsg_printf(error::WARN, ER(ER_FORCING_CLOSE),
00433                     internal::my_progname,
00434                     thread_id,
00435                     security_ctx->username().c_str());
00436     }
00437 
00438     disconnect();
00439   }
00440 
00441   /* Close connection */
00442   if (client)
00443   {
00444     client->close();
00445     boost::checked_delete(client);
00446     client= NULL;
00447   }
00448 
00449   if (cleanup_done == false)
00450     cleanup();
00451 
00452   plugin::StorageEngine::closeConnection(this);
00453   plugin_sessionvar_cleanup(this);
00454 
00455   warn_root.free_root(MYF(0));
00456   mysys_var=0;          // Safety (shouldn't be needed)
00457   dbug_sentry= Session_SENTRY_GONE;
00458 
00459   main_mem_root.free_root(MYF(0));
00460   currentMemRoot().release();
00461   currentSession().release();
00462 
00463   plugin::Logging::postEndDo(this);
00464   plugin::EventObserver::deregisterSessionEvents(session_event_observers); 
00465  
00466   // Free all schema event observer lists.
00467   for (std::map<std::string, plugin::EventObserverList *>::iterator it=schema_event_observers.begin() ; it != schema_event_observers.end(); it++ )
00468     plugin::EventObserver::deregisterSchemaEvents(it->second);
00469 
00470 }
00471 
00472 void Session::setClient(plugin::Client *client_arg)
00473 {
00474   client= client_arg;
00475   client->setSession(this);
00476 }
00477 
00478 void Session::awake(Session::killed_state_t state_to_set)
00479 {
00480   if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
00481     return;
00482 
00483   this->checkSentry();
00484 
00485   setKilled(state_to_set);
00486   scheduler->killSession(this);
00487 
00488   if (state_to_set != Session::KILL_QUERY)
00489   {
00490     DRIZZLE_CONNECTION_DONE(thread_id);
00491   }
00492 
00493   if (mysys_var)
00494   {
00495     boost_unique_lock_t scopedLock(mysys_var->mutex);
00496     /*
00497       "
00498       This broadcast could be up in the air if the victim thread
00499       exits the cond in the time between read and broadcast, but that is
00500       ok since all we want to do is to make the victim thread get out
00501       of waiting on current_cond.
00502       If we see a non-zero current_cond: it cannot be an old value (because
00503       then exit_cond() should have run and it can't because we have mutex); so
00504       it is the true value but maybe current_mutex is not yet non-zero (we're
00505       in the middle of enter_cond() and there is a "memory order
00506       inversion"). So we test the mutex too to not lock 0.
00507 
00508       Note that there is a small chance we fail to kill. If victim has locked
00509       current_mutex, but hasn't yet entered enter_cond() (which means that
00510       current_cond and current_mutex are 0), then the victim will not get
00511       a signal and it may wait "forever" on the cond (until
00512       we issue a second KILL or the status it's waiting for happens).
00513       It's true that we have set its session->killed but it may not
00514       see it immediately and so may have time to reach the cond_wait().
00515     */
00516     if (mysys_var->current_cond && mysys_var->current_mutex)
00517     {
00518       mysys_var->current_mutex->lock();
00519       mysys_var->current_cond->notify_all();
00520       mysys_var->current_mutex->unlock();
00521     }
00522   }
00523 }
00524 
00525 /*
00526   Remember the location of thread info, the structure needed for
00527   memory::sql_alloc() and the structure for the net buffer
00528 */
00529 bool Session::storeGlobals()
00530 {
00531   /*
00532     Assert that thread_stack is initialized: it's necessary to be able
00533     to track stack overrun.
00534   */
00535   assert(thread_stack);
00536 
00537   currentSession().release();
00538   currentSession().reset(this);
00539 
00540   currentMemRoot().release();
00541   currentMemRoot().reset(&mem_root);
00542 
00543   mysys_var=my_thread_var;
00544 
00545   /*
00546     Let mysqld define the thread id (not mysys)
00547     This allows us to move Session to different threads if needed.
00548   */
00549   mysys_var->id= thread_id;
00550 
00551   /*
00552     We have to call thr_lock_info_init() again here as Session may have been
00553     created in another thread
00554   */
00555   lock_info.init();
00556 
00557   return false;
00558 }
00559 
00560 /*
00561   Init Session for query processing.
00562   This has to be called once before we call mysql_parse.
00563   See also comments in session.h.
00564 */
00565 
00566 void Session::prepareForQueries()
00567 {
00568   if (variables.max_join_size == HA_POS_ERROR)
00569     options |= OPTION_BIG_SELECTS;
00570 
00571   version= refresh_version;
00572   set_proc_info(NULL);
00573   command= COM_SLEEP;
00574   set_time();
00575 
00576   mem_root->reset_root_defaults(variables.query_alloc_block_size,
00577                                 variables.query_prealloc_size);
00578   transaction.xid_state.xid.null();
00579   transaction.xid_state.in_session=1;
00580   if (use_usage)
00581     resetUsage();
00582 }
00583 
00584 bool Session::initGlobals()
00585 {
00586   if (storeGlobals())
00587   {
00588     disconnect(ER_OUT_OF_RESOURCES);
00589     status_var.aborted_connects++;
00590     return true;
00591   }
00592   return false;
00593 }
00594 
00595 void Session::run()
00596 {
00597   if (initGlobals() || authenticate())
00598   {
00599     disconnect();
00600     return;
00601   }
00602 
00603   prepareForQueries();
00604 
00605   while (not client->haveError() && getKilled() != KILL_CONNECTION)
00606   {
00607     if (not executeStatement())
00608       break;
00609   }
00610 
00611   disconnect();
00612 }
00613 
00614 bool Session::schedule(Session::shared_ptr &arg)
00615 {
00616   arg->scheduler= plugin::Scheduler::getScheduler();
00617   assert(arg->scheduler);
00618 
00619   ++connection_count;
00620 
00621   long current_connections= connection_count;
00622 
00623   if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
00624   {
00625     current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
00626   }
00627 
00628   current_global_counters.connections++;
00629   arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
00630 
00631   session::Cache::singleton().insert(arg);
00632 
00633   if (unlikely(plugin::EventObserver::connectSession(*arg)))
00634   {
00635     // We should do something about an error...
00636   }
00637 
00638   if (plugin::Scheduler::getScheduler()->addSession(arg))
00639   {
00640     DRIZZLE_CONNECTION_START(arg->getSessionId());
00641     char error_message_buff[DRIZZLE_ERRMSG_SIZE];
00642 
00643     arg->setKilled(Session::KILL_CONNECTION);
00644 
00645     arg->status_var.aborted_connects++;
00646 
00647     /* Can't use my_error() since store_globals has not been called. */
00648     /* TODO replace will better error message */
00649     snprintf(error_message_buff, sizeof(error_message_buff),
00650              ER(ER_CANT_CREATE_THREAD), 1);
00651     arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
00652 
00653     return true;
00654   }
00655 
00656   return false;
00657 }
00658 
00659 
00660 /*
00661   Is this session viewable by the current user?
00662 */
00663 bool Session::isViewable(identifier::User::const_reference user_arg) const
00664 {
00665   return plugin::Authorization::isAuthorized(user_arg, *this, false);
00666 }
00667 
00668 
00669 const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
00670 {
00671   const char* old_msg = get_proc_info();
00672   safe_mutex_assert_owner(mutex);
00673   mysys_var->current_mutex = &mutex;
00674   mysys_var->current_cond = &cond;
00675   this->set_proc_info(msg);
00676   return old_msg;
00677 }
00678 
00679 void Session::exit_cond(const char* old_msg)
00680 {
00681   /*
00682     Putting the mutex unlock in exit_cond() ensures that
00683     mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
00684     locked (if that would not be the case, you'll get a deadlock if someone
00685     does a Session::awake() on you).
00686   */
00687   mysys_var->current_mutex->unlock();
00688   boost_unique_lock_t scopedLock(mysys_var->mutex);
00689   mysys_var->current_mutex = 0;
00690   mysys_var->current_cond = 0;
00691   this->set_proc_info(old_msg);
00692 }
00693 
00694 bool Session::authenticate()
00695 {
00696   if (client->authenticate())
00697     return false;
00698 
00699   status_var.aborted_connects++;
00700 
00701   return true;
00702 }
00703 
00704 bool Session::checkUser(const std::string &passwd_str,
00705                         const std::string &in_db)
00706 {
00707   bool is_authenticated=
00708     plugin::Authentication::isAuthenticated(*user(), passwd_str);
00709 
00710   if (is_authenticated != true)
00711   {
00712     status_var.access_denied++;
00713     /* isAuthenticated has pushed the error message */
00714     return false;
00715   }
00716 
00717   /* Change database if necessary */
00718   if (not in_db.empty())
00719   {
00720     identifier::Schema identifier(in_db);
00721     if (schema::change(*this, identifier))
00722     {
00723       /* change_db() has pushed the error message. */
00724       return false;
00725     }
00726   }
00727   my_ok();
00728   password= not passwd_str.empty();
00729 
00730   /* Ready to handle queries */
00731   return true;
00732 }
00733 
00734 bool Session::executeStatement()
00735 {
00736   char *l_packet= 0;
00737   uint32_t packet_length;
00738 
00739   enum enum_server_command l_command;
00740 
00741   /*
00742     indicator of uninitialized lex => normal flow of errors handling
00743     (see my_message_sql)
00744   */
00745   lex().current_select= 0;
00746   clear_error();
00747   main_da.reset_diagnostics_area();
00748 
00749   if (client->readCommand(&l_packet, &packet_length) == false)
00750   {
00751     return false;
00752   }
00753 
00754   if (getKilled() == KILL_CONNECTION)
00755     return false;
00756 
00757   if (packet_length == 0)
00758     return true;
00759 
00760   l_command= static_cast<enum_server_command>(l_packet[0]);
00761 
00762   if (command >= COM_END)
00763     command= COM_END;                           // Wrong command
00764 
00765   assert(packet_length);
00766   return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
00767 }
00768 
00769 bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
00770 {
00771   /* Remove garbage at start and end of query */
00772   while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
00773   {
00774     in_packet++;
00775     in_packet_length--;
00776   }
00777   const char *pos= in_packet + in_packet_length; /* Point at end null */
00778   while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
00779   {
00780     pos--;
00781     in_packet_length--;
00782   }
00783 
00784   std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
00785   // We can not be entirely sure _schema has a value
00786   if (_schema)
00787   {
00788     plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
00789   }
00790   query.reset(new_query);
00791   _state.reset(new session::State(in_packet, in_packet_length));
00792 
00793   return true;
00794 }
00795 
00796 bool Session::endTransaction(enum enum_mysql_completiontype completion)
00797 {
00798   bool do_release= 0;
00799   bool result= true;
00800   TransactionServices &transaction_services= TransactionServices::singleton();
00801 
00802   if (transaction.xid_state.xa_state != XA_NOTR)
00803   {
00804     my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
00805     return false;
00806   }
00807   switch (completion)
00808   {
00809     case COMMIT:
00810       /*
00811        * We don't use endActiveTransaction() here to ensure that this works
00812        * even if there is a problem with the OPTION_AUTO_COMMIT flag
00813        * (Which of course should never happen...)
00814        */
00815       server_status&= ~SERVER_STATUS_IN_TRANS;
00816       if (transaction_services.commitTransaction(*this, true))
00817         result= false;
00818       options&= ~(OPTION_BEGIN);
00819       break;
00820     case COMMIT_RELEASE:
00821       do_release= 1; /* fall through */
00822     case COMMIT_AND_CHAIN:
00823       result= endActiveTransaction();
00824       if (result == true && completion == COMMIT_AND_CHAIN)
00825         result= startTransaction();
00826       break;
00827     case ROLLBACK_RELEASE:
00828       do_release= 1; /* fall through */
00829     case ROLLBACK:
00830     case ROLLBACK_AND_CHAIN:
00831     {
00832       server_status&= ~SERVER_STATUS_IN_TRANS;
00833       if (transaction_services.rollbackTransaction(*this, true))
00834         result= false;
00835       options&= ~(OPTION_BEGIN);
00836       if (result == true && (completion == ROLLBACK_AND_CHAIN))
00837         result= startTransaction();
00838       break;
00839     }
00840     default:
00841       my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
00842       return false;
00843   }
00844 
00845   if (result == false)
00846   {
00847     my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
00848   }
00849   else if ((result == true) && do_release)
00850   {
00851     setKilled(Session::KILL_CONNECTION);
00852   }
00853 
00854   return result;
00855 }
00856 
00857 bool Session::endActiveTransaction()
00858 {
00859   bool result= true;
00860   TransactionServices &transaction_services= TransactionServices::singleton();
00861 
00862   if (transaction.xid_state.xa_state != XA_NOTR)
00863   {
00864     my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
00865     return false;
00866   }
00867   if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00868   {
00869     server_status&= ~SERVER_STATUS_IN_TRANS;
00870     if (transaction_services.commitTransaction(*this, true))
00871       result= false;
00872   }
00873   options&= ~(OPTION_BEGIN);
00874   return result;
00875 }
00876 
00877 bool Session::startTransaction(start_transaction_option_t opt)
00878 {
00879   bool result= true;
00880 
00881   assert(! inTransaction());
00882 
00883   options|= OPTION_BEGIN;
00884   server_status|= SERVER_STATUS_IN_TRANS;
00885 
00886   if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
00887   {
00888     result= false;
00889   }
00890 
00891   return result;
00892 }
00893 
00894 void Session::cleanup_after_query()
00895 {
00896   /*
00897     Reset rand_used so that detection of calls to rand() will save random
00898     seeds if needed by the slave.
00899   */
00900   {
00901     /* Forget those values, for next binlogger: */
00902     auto_inc_intervals_in_cur_stmt_for_binlog.empty();
00903   }
00904   if (first_successful_insert_id_in_cur_stmt > 0)
00905   {
00906     /* set what LAST_INSERT_ID() will return */
00907     first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
00908     first_successful_insert_id_in_cur_stmt= 0;
00909     substitute_null_with_insert_id= true;
00910   }
00911 
00912   arg_of_last_insert_id_function= false;
00913 
00914   /* Free Items that were created during this execution */
00915   free_items();
00916 
00917   /* Reset _where. */
00918   _where= Session::DEFAULT_WHERE;
00919 
00920   /* Reset the temporary shares we built */
00921   for_each(temporary_shares.begin(),
00922            temporary_shares.end(),
00923            DeletePtr());
00924   temporary_shares.clear();
00925 }
00926 
00937 LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
00938                                      const std::string &str,
00939                                      bool allocate_lex_string)
00940 {
00941   return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
00942 }
00943 
00944 LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
00945                                      const char* str, uint32_t length,
00946                                      bool allocate_lex_string)
00947 {
00948   if (allocate_lex_string)
00949     if (!(lex_str= (LEX_STRING *)getMemRoot()->allocate(sizeof(LEX_STRING))))
00950       return 0;
00951   if (!(lex_str->str= mem_root->strmake_root(str, length)))
00952     return 0;
00953   lex_str->length= length;
00954   return lex_str;
00955 }
00956 
00957 int Session::send_explain_fields(select_result *result)
00958 {
00959   List<Item> field_list;
00960   Item *item;
00961   const CHARSET_INFO * const cs= system_charset_info;
00962   field_list.push_back(new Item_return_int("id",3, DRIZZLE_TYPE_LONGLONG));
00963   field_list.push_back(new Item_empty_string("select_type", 19, cs));
00964   field_list.push_back(item= new Item_empty_string("table", NAME_CHAR_LEN, cs));
00965   item->maybe_null= 1;
00966   field_list.push_back(item= new Item_empty_string("type", 10, cs));
00967   item->maybe_null= 1;
00968   field_list.push_back(item=new Item_empty_string("possible_keys",
00969               NAME_CHAR_LEN*MAX_KEY, cs));
00970   item->maybe_null=1;
00971   field_list.push_back(item=new Item_empty_string("key", NAME_CHAR_LEN, cs));
00972   item->maybe_null=1;
00973   field_list.push_back(item=
00974     new Item_empty_string("key_len",
00975                           MAX_KEY *
00976                           (MAX_KEY_LENGTH_DECIMAL_WIDTH + 1 /* for comma */),
00977                           cs));
00978   item->maybe_null=1;
00979   field_list.push_back(item=new Item_empty_string("ref",
00980                                                   NAME_CHAR_LEN*MAX_REF_PARTS,
00981                                                   cs));
00982   item->maybe_null=1;
00983   field_list.push_back(item= new Item_return_int("rows", 10,
00984                                                  DRIZZLE_TYPE_LONGLONG));
00985   if (lex().describe & DESCRIBE_EXTENDED)
00986   {
00987     field_list.push_back(item= new Item_float("filtered", 0.1234, 2, 4));
00988     item->maybe_null=1;
00989   }
00990   item->maybe_null= 1;
00991   field_list.push_back(new Item_empty_string("Extra", 255, cs));
00992   return (result->send_fields(field_list));
00993 }
00994 
00995 void select_result::send_error(drizzled::error_t errcode, const char *err)
00996 {
00997   my_message(errcode, err, MYF(0));
00998 }
00999 
01000 /************************************************************************
01001   Handling writing to file
01002 ************************************************************************/
01003 
01004 void select_to_file::send_error(drizzled::error_t errcode,const char *err)
01005 {
01006   my_message(errcode, err, MYF(0));
01007   if (file > 0)
01008   {
01009     (void) cache->end_io_cache();
01010     (void) internal::my_close(file, MYF(0));
01011     (void) internal::my_delete(path.file_string().c_str(), MYF(0));   // Delete file on error
01012     file= -1;
01013   }
01014 }
01015 
01016 
01017 bool select_to_file::send_eof()
01018 {
01019   int error= test(cache->end_io_cache());
01020   if (internal::my_close(file, MYF(MY_WME)))
01021     error= 1;
01022   if (!error)
01023   {
01024     /*
01025       In order to remember the value of affected rows for ROW_COUNT()
01026       function, SELECT INTO has to have an own SQLCOM.
01027       TODO: split from SQLCOM_SELECT
01028     */
01029     session->my_ok(row_count);
01030   }
01031   file= -1;
01032   return error;
01033 }
01034 
01035 
01036 void select_to_file::cleanup()
01037 {
01038   /* In case of error send_eof() may be not called: close the file here. */
01039   if (file >= 0)
01040   {
01041     (void) cache->end_io_cache();
01042     (void) internal::my_close(file, MYF(0));
01043     file= -1;
01044   }
01045   path= "";
01046   row_count= 0;
01047 }
01048 
01049 select_to_file::select_to_file(file_exchange *ex)
01050   : exchange(ex),
01051     file(-1),
01052     cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
01053     row_count(0L)
01054 {
01055   path= "";
01056 }
01057 
01058 select_to_file::~select_to_file()
01059 {
01060   cleanup();
01061 }
01062 
01063 /***************************************************************************
01064 ** Export of select to textfile
01065 ***************************************************************************/
01066 
01067 select_export::~select_export()
01068 {
01069   session->sent_row_count=row_count;
01070 }
01071 
01072 
01073 /*
01074   Create file with IO cache
01075 
01076   SYNOPSIS
01077     create_file()
01078     session     Thread handle
01079     path    File name
01080     exchange    Excange class
01081     cache   IO cache
01082 
01083   RETURN
01084     >= 0  File handle
01085    -1   Error
01086 */
01087 
01088 
01089 static int create_file(Session *session,
01090                        fs::path &target_path,
01091                        file_exchange *exchange,
01092                        internal::IO_CACHE *cache)
01093 {
01094   fs::path to_file(exchange->file_name);
01095   int file;
01096 
01097   if (not to_file.has_root_directory())
01098   {
01099     target_path= fs::system_complete(getDataHomeCatalog());
01100     util::string::const_shared_ptr schema(session->schema());
01101     if (schema and not schema->empty())
01102     {
01103       int count_elements= 0;
01104       for (fs::path::iterator iter= to_file.begin();
01105            iter != to_file.end();
01106            ++iter, ++count_elements)
01107       { }
01108 
01109       if (count_elements == 1)
01110       {
01111         target_path /= *schema;
01112       }
01113     }
01114     target_path /= to_file;
01115   }
01116   else
01117   {
01118     target_path = exchange->file_name;
01119   }
01120 
01121   if (not secure_file_priv.string().empty())
01122   {
01123     if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
01124     {
01125       /* Write only allowed to dir or subdir specified by secure_file_priv */
01126       my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
01127       return -1;
01128     }
01129   }
01130 
01131   if (!access(target_path.file_string().c_str(), F_OK))
01132   {
01133     my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
01134     return -1;
01135   }
01136   /* Create the file world readable */
01137   if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
01138     return file;
01139   (void) fchmod(file, 0666);      // Because of umask()
01140   if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
01141   {
01142     internal::my_close(file, MYF(0));
01143     internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
01144     return -1;
01145   }
01146   return file;
01147 }
01148 
01149 
01150 int
01151 select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
01152 {
01153   bool blob_flag=0;
01154   bool string_results= false, non_string_results= false;
01155   unit= u;
01156   if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
01157   {
01158     path= exchange->file_name;
01159   }
01160 
01161   /* Check if there is any blobs in data */
01162   {
01163     List<Item>::iterator li(list.begin());
01164     Item *item;
01165     while ((item=li++))
01166     {
01167       if (item->max_length >= MAX_BLOB_WIDTH)
01168       {
01169         blob_flag=1;
01170         break;
01171       }
01172 
01173       if (item->result_type() == STRING_RESULT)
01174         string_results= true;
01175       else
01176         non_string_results= true;
01177     }
01178   }
01179   field_term_length=exchange->field_term->length();
01180   field_term_char= field_term_length ?
01181                    (int) (unsigned char) (*exchange->field_term)[0] : INT_MAX;
01182   if (!exchange->line_term->length())
01183     exchange->line_term=exchange->field_term; // Use this if it exists
01184   field_sep_char= (exchange->enclosed->length() ?
01185                   (int) (unsigned char) (*exchange->enclosed)[0] : field_term_char);
01186   escape_char=  (exchange->escaped->length() ?
01187                 (int) (unsigned char) (*exchange->escaped)[0] : -1);
01188   is_ambiguous_field_sep= test(strchr(ESCAPE_CHARS, field_sep_char));
01189   is_unsafe_field_sep= test(strchr(NUMERIC_CHARS, field_sep_char));
01190   line_sep_char= (exchange->line_term->length() ?
01191                  (int) (unsigned char) (*exchange->line_term)[0] : INT_MAX);
01192   if (!field_term_length)
01193     exchange->opt_enclosed=0;
01194   if (!exchange->enclosed->length())
01195     exchange->opt_enclosed=1;     // A little quicker loop
01196   fixed_row_size= (!field_term_length && !exchange->enclosed->length() &&
01197        !blob_flag);
01198   if ((is_ambiguous_field_sep && exchange->enclosed->is_empty() &&
01199        (string_results || is_unsafe_field_sep)) ||
01200       (exchange->opt_enclosed && non_string_results &&
01201        field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
01202   {
01203     my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
01204     return 1;
01205   }
01206 
01207   if ((file= create_file(session, path, exchange, cache)) < 0)
01208     return 1;
01209 
01210   return 0;
01211 }
01212 
01213 bool select_export::send_data(List<Item> &items)
01214 {
01215   char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
01216   bool space_inited=0;
01217   String tmp(buff,sizeof(buff),&my_charset_bin),*res;
01218   tmp.length(0);
01219 
01220   if (unit->offset_limit_cnt)
01221   {           // using limit offset,count
01222     unit->offset_limit_cnt--;
01223     return false;
01224   }
01225   row_count++;
01226   Item *item;
01227   uint32_t used_length=0,items_left=items.size();
01228   List<Item>::iterator li(items.begin());
01229 
01230   if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
01231                  exchange->line_start->length()))
01232     return true;
01233 
01234   while ((item=li++))
01235   {
01236     Item_result result_type=item->result_type();
01237     bool enclosed = (exchange->enclosed->length() &&
01238                      (!exchange->opt_enclosed || result_type == STRING_RESULT));
01239     res=item->str_result(&tmp);
01240     if (res && enclosed)
01241     {
01242       if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
01243                      exchange->enclosed->length()))
01244         return true;
01245     }
01246     if (!res)
01247     {           // NULL
01248       if (!fixed_row_size)
01249       {
01250         if (escape_char != -1)      // Use \N syntax
01251         {
01252           null_buff[0]=escape_char;
01253           null_buff[1]='N';
01254           if (my_b_write(cache,(unsigned char*) null_buff,2))
01255             return true;
01256         }
01257         else if (my_b_write(cache,(unsigned char*) "NULL",4))
01258           return true;
01259       }
01260       else
01261       {
01262         used_length=0;        // Fill with space
01263       }
01264     }
01265     else
01266     {
01267       if (fixed_row_size)
01268         used_length= min(res->length(), static_cast<size_t>(item->max_length));
01269       else
01270         used_length= res->length();
01271 
01272       if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
01273           escape_char != -1)
01274       {
01275         char *pos, *start, *end;
01276         const CHARSET_INFO * const res_charset= res->charset();
01277         const CHARSET_INFO * const character_set_client= default_charset_info;
01278 
01279         bool check_second_byte= (res_charset == &my_charset_bin) &&
01280           character_set_client->
01281           escape_with_backslash_is_dangerous;
01282         assert(character_set_client->mbmaxlen == 2 ||
01283                !character_set_client->escape_with_backslash_is_dangerous);
01284         for (start=pos=(char*) res->ptr(),end=pos+used_length ;
01285              pos != end ;
01286              pos++)
01287         {
01288           if (use_mb(res_charset))
01289           {
01290             int l;
01291             if ((l=my_ismbchar(res_charset, pos, end)))
01292             {
01293               pos += l-1;
01294               continue;
01295             }
01296           }
01297 
01298           /*
01299             Special case when dumping BINARY/VARBINARY/BLOB values
01300             for the clients with character sets big5, cp932, gbk and sjis,
01301             which can have the escape character (0x5C "\" by default)
01302             as the second byte of a multi-byte sequence.
01303 
01304             If
01305             - pos[0] is a valid multi-byte head (e.g 0xEE) and
01306             - pos[1] is 0x00, which will be escaped as "\0",
01307 
01308             then we'll get "0xEE + 0x5C + 0x30" in the output file.
01309 
01310             If this file is later loaded using this sequence of commands:
01311 
01312             mysql> create table t1 (a varchar(128)) character set big5;
01313             mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
01314 
01315             then 0x5C will be misinterpreted as the second byte
01316             of a multi-byte character "0xEE + 0x5C", instead of
01317             escape character for 0x00.
01318 
01319             To avoid this confusion, we'll escape the multi-byte
01320             head character too, so the sequence "0xEE + 0x00" will be
01321             dumped as "0x5C + 0xEE + 0x5C + 0x30".
01322 
01323             Note, in the condition below we only check if
01324             mbcharlen is equal to 2, because there are no
01325             character sets with mbmaxlen longer than 2
01326             and with escape_with_backslash_is_dangerous set.
01327             assert before the loop makes that sure.
01328           */
01329 
01330           if ((needs_escaping(*pos, enclosed) ||
01331                (check_second_byte &&
01332                 my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
01333                 pos + 1 < end &&
01334                 needs_escaping(pos[1], enclosed))) &&
01335               /*
01336                 Don't escape field_term_char by doubling - doubling is only
01337                 valid for ENCLOSED BY characters:
01338               */
01339               (enclosed || !is_ambiguous_field_term ||
01340                (int) (unsigned char) *pos != field_term_char))
01341           {
01342             char tmp_buff[2];
01343             tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
01344                           is_ambiguous_field_sep) ?
01345               field_sep_char : escape_char;
01346             tmp_buff[1]= *pos ? *pos : '0';
01347             if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
01348                 my_b_write(cache,(unsigned char*) tmp_buff,2))
01349               return true;
01350             start=pos+1;
01351           }
01352         }
01353         if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
01354           return true;
01355       }
01356       else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
01357         return true;
01358     }
01359     if (fixed_row_size)
01360     {           // Fill with space
01361       if (item->max_length > used_length)
01362       {
01363         /* QQ:  Fix by adding a my_b_fill() function */
01364         if (!space_inited)
01365         {
01366           space_inited=1;
01367           memset(space, ' ', sizeof(space));
01368         }
01369         uint32_t length=item->max_length-used_length;
01370         for (; length > sizeof(space) ; length-=sizeof(space))
01371         {
01372           if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
01373             return true;
01374         }
01375         if (my_b_write(cache,(unsigned char*) space,length))
01376           return true;
01377       }
01378     }
01379     if (res && enclosed)
01380     {
01381       if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
01382                      exchange->enclosed->length()))
01383         return true;
01384     }
01385     if (--items_left)
01386     {
01387       if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
01388                      field_term_length))
01389         return true;
01390     }
01391   }
01392   if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
01393                  exchange->line_term->length()))
01394   {
01395     return true;
01396   }
01397 
01398   return false;
01399 }
01400 
01401 
01402 /***************************************************************************
01403 ** Dump  of select to a binary file
01404 ***************************************************************************/
01405 
01406 
01407 int
01408 select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
01409 {
01410   unit= u;
01411   return (int) ((file= create_file(session, path, exchange, cache)) < 0);
01412 }
01413 
01414 
01415 bool select_dump::send_data(List<Item> &items)
01416 {
01417   List<Item>::iterator li(items.begin());
01418   char buff[MAX_FIELD_WIDTH];
01419   String tmp(buff,sizeof(buff),&my_charset_bin),*res;
01420   tmp.length(0);
01421   Item *item;
01422 
01423   if (unit->offset_limit_cnt)
01424   {           // using limit offset,count
01425     unit->offset_limit_cnt--;
01426     return(0);
01427   }
01428   if (row_count++ > 1)
01429   {
01430     my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
01431     return 1;
01432   }
01433   while ((item=li++))
01434   {
01435     res=item->str_result(&tmp);
01436     if (!res)         // If NULL
01437     {
01438       if (my_b_write(cache,(unsigned char*) "",1))
01439         return 1;
01440     }
01441     else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
01442     {
01443       my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
01444       return 1;
01445     }
01446   }
01447   return(0);
01448 }
01449 
01450 
01451 select_subselect::select_subselect(Item_subselect *item_arg)
01452 {
01453   item= item_arg;
01454 }
01455 
01456 
01457 bool select_singlerow_subselect::send_data(List<Item> &items)
01458 {
01459   Item_singlerow_subselect *it= (Item_singlerow_subselect *)item;
01460   if (it->assigned())
01461   {
01462     my_message(ER_SUBQUERY_NO_1_ROW, ER(ER_SUBQUERY_NO_1_ROW), MYF(0));
01463     return(1);
01464   }
01465   if (unit->offset_limit_cnt)
01466   {                 // Using limit offset,count
01467     unit->offset_limit_cnt--;
01468     return(0);
01469   }
01470   List<Item>::iterator li(items.begin());
01471   Item *val_item;
01472   for (uint32_t i= 0; (val_item= li++); i++)
01473     it->store(i, val_item);
01474   it->assigned(1);
01475   return(0);
01476 }
01477 
01478 
01479 void select_max_min_finder_subselect::cleanup()
01480 {
01481   cache= 0;
01482 }
01483 
01484 
01485 bool select_max_min_finder_subselect::send_data(List<Item> &items)
01486 {
01487   Item_maxmin_subselect *it= (Item_maxmin_subselect *)item;
01488   List<Item>::iterator li(items.begin());
01489   Item *val_item= li++;
01490   it->register_value();
01491   if (it->assigned())
01492   {
01493     cache->store(val_item);
01494     if ((this->*op)())
01495       it->store(0, cache);
01496   }
01497   else
01498   {
01499     if (!cache)
01500     {
01501       cache= Item_cache::get_cache(val_item);
01502       switch (val_item->result_type())
01503       {
01504       case REAL_RESULT:
01505         op= &select_max_min_finder_subselect::cmp_real;
01506         break;
01507       case INT_RESULT:
01508         op= &select_max_min_finder_subselect::cmp_int;
01509         break;
01510       case STRING_RESULT:
01511         op= &select_max_min_finder_subselect::cmp_str;
01512         break;
01513       case DECIMAL_RESULT:
01514         op= &select_max_min_finder_subselect::cmp_decimal;
01515         break;
01516       case ROW_RESULT:
01517         // This case should never be choosen
01518         assert(0);
01519         op= 0;
01520       }
01521     }
01522     cache->store(val_item);
01523     it->store(0, cache);
01524   }
01525   it->assigned(1);
01526   return(0);
01527 }
01528 
01529 bool select_max_min_finder_subselect::cmp_real()
01530 {
01531   Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
01532   double val1= cache->val_real(), val2= maxmin->val_real();
01533   if (fmax)
01534     return (cache->null_value && !maxmin->null_value) ||
01535       (!cache->null_value && !maxmin->null_value &&
01536        val1 > val2);
01537   return (maxmin->null_value && !cache->null_value) ||
01538     (!cache->null_value && !maxmin->null_value &&
01539      val1 < val2);
01540 }
01541 
01542 bool select_max_min_finder_subselect::cmp_int()
01543 {
01544   Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
01545   int64_t val1= cache->val_int(), val2= maxmin->val_int();
01546   if (fmax)
01547     return (cache->null_value && !maxmin->null_value) ||
01548       (!cache->null_value && !maxmin->null_value &&
01549        val1 > val2);
01550   return (maxmin->null_value && !cache->null_value) ||
01551     (!cache->null_value && !maxmin->null_value &&
01552      val1 < val2);
01553 }
01554 
01555 bool select_max_min_finder_subselect::cmp_decimal()
01556 {
01557   Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
01558   type::Decimal cval, *cvalue= cache->val_decimal(&cval);
01559   type::Decimal mval, *mvalue= maxmin->val_decimal(&mval);
01560   if (fmax)
01561     return (cache->null_value && !maxmin->null_value) ||
01562       (!cache->null_value && !maxmin->null_value &&
01563        class_decimal_cmp(cvalue, mvalue) > 0) ;
01564   return (maxmin->null_value && !cache->null_value) ||
01565     (!cache->null_value && !maxmin->null_value &&
01566      class_decimal_cmp(cvalue,mvalue) < 0);
01567 }
01568 
01569 bool select_max_min_finder_subselect::cmp_str()
01570 {
01571   String *val1, *val2, buf1, buf2;
01572   Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
01573   /*
01574     as far as both operand is Item_cache buf1 & buf2 will not be used,
01575     but added for safety
01576   */
01577   val1= cache->val_str(&buf1);
01578   val2= maxmin->val_str(&buf1);
01579   if (fmax)
01580     return (cache->null_value && !maxmin->null_value) ||
01581       (!cache->null_value && !maxmin->null_value &&
01582        sortcmp(val1, val2, cache->collation.collation) > 0) ;
01583   return (maxmin->null_value && !cache->null_value) ||
01584     (!cache->null_value && !maxmin->null_value &&
01585      sortcmp(val1, val2, cache->collation.collation) < 0);
01586 }
01587 
01588 bool select_exists_subselect::send_data(List<Item> &)
01589 {
01590   Item_exists_subselect *it= (Item_exists_subselect *)item;
01591   if (unit->offset_limit_cnt)
01592   { // Using limit offset,count
01593     unit->offset_limit_cnt--;
01594     return(0);
01595   }
01596   it->value= 1;
01597   it->assigned(1);
01598   return(0);
01599 }
01600 
01601 /*
01602   Don't free mem_root, as mem_root is freed in the end of dispatch_command
01603   (once for any command).
01604 */
01605 void Session::end_statement()
01606 {
01607   /* Cleanup SQL processing state to reuse this statement in next query. */
01608   lex().end();
01609   query_cache_key= ""; // reset the cache key
01610   resetResultsetMessage();
01611 }
01612 
01613 bool Session::copy_db_to(char **p_db, size_t *p_db_length)
01614 {
01615   assert(_schema);
01616   if (_schema and _schema->empty())
01617   {
01618     my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
01619     return true;
01620   }
01621   else if (not _schema)
01622   {
01623     my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
01624     return true;
01625   }
01626   assert(_schema);
01627 
01628   *p_db= strmake(_schema->c_str(), _schema->size());
01629   *p_db_length= _schema->size();
01630 
01631   return false;
01632 }
01633 
01634 /****************************************************************************
01635   Tmp_Table_Param
01636 ****************************************************************************/
01637 
01638 void Tmp_Table_Param::init()
01639 {
01640   field_count= sum_func_count= func_count= hidden_field_count= 0;
01641   group_parts= group_length= group_null_parts= 0;
01642   quick_group= 1;
01643   table_charset= 0;
01644   precomputed_group_by= 0;
01645 }
01646 
01647 void Tmp_Table_Param::cleanup(void)
01648 {
01649   /* Fix for Intel compiler */
01650   if (copy_field)
01651   {
01652     boost::checked_array_delete(copy_field);
01653     save_copy_field= save_copy_field_end= copy_field= copy_field_end= 0;
01654   }
01655 }
01656 
01657 void Session::send_kill_message() const
01658 {
01659   drizzled::error_t err= static_cast<drizzled::error_t>(killed_errno());
01660   if (err != EE_OK)
01661     my_message(err, ER(err), MYF(0));
01662 }
01663 
01664 void Session::set_status_var_init()
01665 {
01666   memset(&status_var, 0, sizeof(status_var));
01667 }
01668 
01669 
01670 void Session::set_db(const std::string &new_db)
01671 {
01672   /* Do not reallocate memory if current chunk is big enough. */
01673   if (new_db.length())
01674   {
01675     _schema.reset(new std::string(new_db));
01676   }
01677   else
01678   {
01679     _schema.reset(new std::string(""));
01680   }
01681 }
01682 
01683 
01690 void Session::markTransactionForRollback(bool all)
01691 {
01692   is_fatal_sub_stmt_error= true;
01693   transaction_rollback_request= all;
01694 }
01695 
01696 void Session::disconnect(enum error_t errcode)
01697 {
01698   /* Allow any plugins to cleanup their session variables */
01699   plugin_sessionvar_cleanup(this);
01700 
01701   /* If necessary, log any aborted or unauthorized connections */
01702   if (getKilled() || client->wasAborted())
01703   {
01704     status_var.aborted_threads++;
01705   }
01706 
01707   if (client->wasAborted())
01708   {
01709     if (not getKilled() && variables.log_warnings > 1)
01710     {
01711       errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
01712                   , thread_id
01713                   , (_schema->empty() ? "unconnected" : _schema->c_str())
01714                   , security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
01715                   , security_ctx->address().c_str()
01716                   , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
01717     }
01718   }
01719 
01720   setKilled(Session::KILL_CONNECTION);
01721 
01722   if (client->isConnected())
01723   {
01724     if (errcode != EE_OK)
01725     {
01726       /*my_error(errcode, ER(errcode));*/
01727       client->sendError(errcode, ER(errcode));
01728     }
01729     client->close();
01730   }
01731 }
01732 
01733 void Session::reset_for_next_command()
01734 {
01735   free_list= 0;
01736   select_number= 1;
01737   /*
01738     Those two lines below are theoretically unneeded as
01739     Session::cleanup_after_query() should take care of this already.
01740   */
01741   auto_inc_intervals_in_cur_stmt_for_binlog.empty();
01742 
01743   is_fatal_error= false;
01744   server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
01745                           SERVER_QUERY_NO_INDEX_USED |
01746                           SERVER_QUERY_NO_GOOD_INDEX_USED);
01747 
01748   clear_error();
01749   main_da.reset_diagnostics_area();
01750   total_warn_count=0;     // Warnings for this query
01751   sent_row_count= examined_row_count= 0;
01752 }
01753 
01754 /*
01755   Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
01756 */
01757 
01758 void Open_tables_state::close_temporary_tables()
01759 {
01760   Table *table;
01761   Table *tmp_next;
01762 
01763   if (not temporary_tables)
01764     return;
01765 
01766   for (table= temporary_tables; table; table= tmp_next)
01767   {
01768     tmp_next= table->getNext();
01769     nukeTable(table);
01770   }
01771   temporary_tables= NULL;
01772 }
01773 
01774 /*
01775   unlink from session->temporary tables and close temporary table
01776 */
01777 
01778 void Open_tables_state::close_temporary_table(Table *table)
01779 {
01780   if (table->getPrev())
01781   {
01782     table->getPrev()->setNext(table->getNext());
01783     if (table->getPrev()->getNext())
01784     {
01785       table->getNext()->setPrev(table->getPrev());
01786     }
01787   }
01788   else
01789   {
01790     /* removing the item from the list */
01791     assert(table == temporary_tables);
01792     /*
01793       slave must reset its temporary list pointer to zero to exclude
01794       passing non-zero value to end_slave via rli->save_temporary_tables
01795       when no temp tables opened, see an invariant below.
01796     */
01797     temporary_tables= table->getNext();
01798     if (temporary_tables)
01799     {
01800       table->getNext()->setPrev(NULL);
01801     }
01802   }
01803   nukeTable(table);
01804 }
01805 
01806 /*
01807   Close and drop a temporary table
01808 
01809   NOTE
01810   This dosn't unlink table from session->temporary
01811   If this is needed, use close_temporary_table()
01812 */
01813 
01814 void Open_tables_state::nukeTable(Table *table)
01815 {
01816   plugin::StorageEngine *table_type= table->getShare()->db_type();
01817 
01818   table->free_io_cache();
01819   table->delete_table();
01820 
01821   identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
01822   rm_temporary_table(table_type, identifier);
01823 
01824   boost::checked_delete(table->getMutableShare());
01825 
01826   boost::checked_delete(table);
01827 }
01828 
01830 extern time_t flush_status_time;
01831 
01832 void Session::refresh_status()
01833 {
01834   /* Reset thread's status variables */
01835   memset(&status_var, 0, sizeof(status_var));
01836 
01837   flush_status_time= time((time_t*) 0);
01838   current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
01839   current_global_counters.connections= 0;
01840 }
01841 
01842 user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
01843 {
01844   return getVariable(std::string(name.str, name.length), create_if_not_exists);
01845 }
01846 
01847 user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
01848 {
01849   if (cleanup_done)
01850     return NULL;
01851 
01852   if (UserVars::mapped_type* iter= find_ptr(user_vars, name))
01853     return *iter;
01854 
01855   if (not create_if_not_exists)
01856     return NULL;
01857 
01858   user_var_entry *entry= NULL;
01859   entry= new (nothrow) user_var_entry(name.c_str(), query_id);
01860 
01861   if (entry == NULL)
01862     return NULL;
01863 
01864   std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
01865 
01866   if (not returnable.second)
01867   {
01868     boost::checked_delete(entry);
01869   }
01870 
01871   return entry;
01872 }
01873 
01874 void Session::setVariable(const std::string &name, const std::string &value)
01875 {
01876   user_var_entry *updateable_var= getVariable(name.c_str(), true);
01877   if (updateable_var)
01878   {
01879     updateable_var->update_hash(false,
01880                                 (void*)value.c_str(),
01881                                 static_cast<uint32_t>(value.length()), STRING_RESULT,
01882                                 &my_charset_bin,
01883                                 DERIVATION_IMPLICIT, false);
01884   }
01885 }
01886 
01887 void Open_tables_state::mark_temp_tables_as_free_for_reuse()
01888 {
01889   for (Table *table= temporary_tables ; table ; table= table->getNext())
01890   {
01891     if (table->query_id == getQueryId())
01892     {
01893       table->query_id= 0;
01894       table->cursor->ha_reset();
01895     }
01896   }
01897 }
01898 
01899 void Session::mark_used_tables_as_free_for_reuse(Table *table)
01900 {
01901   for (; table ; table= table->getNext())
01902   {
01903     if (table->query_id == getQueryId())
01904     {
01905       table->query_id= 0;
01906       table->cursor->ha_reset();
01907     }
01908   }
01909 }
01910 
01911 /*
01912   Unlocks tables and frees derived tables.
01913   Put all normal tables used by thread in free list.
01914 
01915   It will only close/mark as free for reuse tables opened by this
01916   substatement, it will also check if we are closing tables after
01917   execution of complete query (i.e. we are on upper level) and will
01918   leave prelocked mode if needed.
01919 */
01920 void Session::close_thread_tables()
01921 {
01922   clearDerivedTables();
01923 
01924   /*
01925     Mark all temporary tables used by this statement as free for reuse.
01926   */
01927   mark_temp_tables_as_free_for_reuse();
01928   /*
01929     Let us commit transaction for statement. Since in 5.0 we only have
01930     one statement transaction and don't allow several nested statement
01931     transactions this call will do nothing if we are inside of stored
01932     function or trigger (i.e. statement transaction is already active and
01933     does not belong to statement for which we do close_thread_tables()).
01934     TODO: This should be fixed in later releases.
01935    */
01936   {
01937     TransactionServices &transaction_services= TransactionServices::singleton();
01938     main_da.can_overwrite_status= true;
01939     transaction_services.autocommitOrRollback(*this, is_error());
01940     main_da.can_overwrite_status= false;
01941     transaction.stmt.reset();
01942   }
01943 
01944   if (lock)
01945   {
01946     /*
01947       For RBR we flush the pending event just before we unlock all the
01948       tables.  This means that we are at the end of a topmost
01949       statement, so we ensure that the STMT_END_F flag is set on the
01950       pending event.  For statements that are *inside* stored
01951       functions, the pending event will not be flushed: that will be
01952       handled either before writing a query log event (inside
01953       binlog_query()) or when preparing a pending event.
01954      */
01955     unlockTables(lock);
01956     lock= 0;
01957   }
01958   /*
01959     Note that we need to hold table::Cache::singleton().mutex() while changing the
01960     open_tables list. Another thread may work on it.
01961     (See: table::Cache::singleton().removeTable(), wait_completed_table())
01962     Closing a MERGE child before the parent would be fatal if the
01963     other thread tries to abort the MERGE lock in between.
01964   */
01965   if (open_tables)
01966     close_open_tables();
01967 }
01968 
01969 void Session::close_tables_for_reopen(TableList **tables)
01970 {
01971   /*
01972     If table list consists only from tables from prelocking set, table list
01973     for new attempt should be empty, so we have to update list's root pointer.
01974   */
01975   if (lex().first_not_own_table() == *tables)
01976     *tables= 0;
01977   lex().chop_off_not_own_tables();
01978   for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
01979     tmp->table= 0;
01980   close_thread_tables();
01981 }
01982 
01983 bool Session::openTablesLock(TableList *tables)
01984 {
01985   uint32_t counter;
01986   bool need_reopen;
01987 
01988   for ( ; ; )
01989   {
01990     if (open_tables_from_list(&tables, &counter))
01991       return true;
01992 
01993     if (not lock_tables(tables, counter, &need_reopen))
01994       break;
01995 
01996     if (not need_reopen)
01997       return true;
01998 
01999     close_tables_for_reopen(&tables);
02000   }
02001 
02002   return handle_derived(&lex(), &derived_prepare) || handle_derived(&lex(), &derived_filling);
02003 }
02004 
02005 /*
02006   @note "best_effort" is used in cases were if a failure occurred on this
02007   operation it would not be surprising because we are only removing because there
02008   might be an issue (lame engines).
02009 */
02010 
02011 bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
02012 {
02013   if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
02014   {
02015     if (not best_effort)
02016     {
02017       std::string path;
02018       identifier.getSQLPath(path);
02019       errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
02020                     path.c_str(), errno);
02021     }
02022 
02023     return true;
02024   }
02025 
02026   return false;
02027 }
02028 
02029 bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
02030 {
02031   drizzled::error_t error;
02032   assert(base);
02033 
02034   if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
02035   {
02036     std::string path;
02037     identifier.getSQLPath(path);
02038     errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
02039                   path.c_str(), error);
02040 
02041     return true;
02042   }
02043 
02044   return false;
02045 }
02046 
02051 void Open_tables_state::dumpTemporaryTableNames(const char *foo)
02052 {
02053   Table *table;
02054 
02055   if (not temporary_tables)
02056     return;
02057 
02058   cerr << "Begin Run: " << foo << "\n";
02059   for (table= temporary_tables; table; table= table->getNext())
02060   {
02061     bool have_proto= false;
02062 
02063     message::Table *proto= table->getShare()->getTableMessage();
02064     if (table->getShare()->getTableMessage())
02065       have_proto= true;
02066 
02067     const char *answer= have_proto ? "true" : "false";
02068 
02069     if (have_proto)
02070     {
02071       cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
02072       cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
02073     }
02074     else
02075     {
02076       cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
02077     }
02078   }
02079 }
02080 
02081 table::Singular *Session::getInstanceTable()
02082 {
02083   temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
02084 
02085   table::Singular *tmp_share= temporary_shares.back();
02086 
02087   assert(tmp_share);
02088 
02089   return tmp_share;
02090 }
02091 
02092 
02111 table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
02112 {
02113   temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
02114 
02115   table::Singular *tmp_share= temporary_shares.back();
02116 
02117   assert(tmp_share);
02118 
02119   return tmp_share;
02120 }
02121 
02122 namespace display  {
02123 
02124 static const std::string NONE= "NONE";
02125 static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
02126 static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
02127 
02128 const std::string &type(drizzled::Session::global_read_lock_t type)
02129 {
02130   switch (type) {
02131     default:
02132     case Session::NONE:
02133       return NONE;
02134     case Session::GOT_GLOBAL_READ_LOCK:
02135       return GOT_GLOBAL_READ_LOCK;
02136     case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
02137       return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
02138   }
02139 }
02140 
02141 size_t max_string_length(drizzled::Session::global_read_lock_t)
02142 {
02143   return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
02144 }
02145 
02146 } /* namespace display */
02147 
02148 } /* namespace drizzled */