Drizzled Public API Documentation

multi_thread.cc
00001 /* Copyright (C) 2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 #include <config.h>
00017 
00018 #include <iostream>
00019 
00020 #include <drizzled/pthread_globals.h>
00021 #include <drizzled/module/option_map.h>
00022 #include <drizzled/errmsg_print.h>
00023 #include <drizzled/session.h>
00024 #include <drizzled/session/cache.h>
00025 #include <drizzled/abort_exception.h>
00026 #include <drizzled/transaction_services.h>
00027 #include <drizzled/gettext.h>
00028 
00029 #include <boost/thread.hpp>
00030 #include <boost/bind.hpp>
00031 #include <boost/program_options.hpp>
00032 
00033 #include "multi_thread.h"
00034 
00035 namespace po= boost::program_options;
00036 using namespace std;
00037 using namespace drizzled;
00038 
00039 /* Configuration variables. */
00040 typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
00041 static max_threads_constraint max_threads;
00042 
00043 namespace drizzled
00044 {
00045   extern size_t my_thread_stack_size;
00046 }
00047 
00048 namespace multi_thread {
00049 
00050 void MultiThreadScheduler::runSession(drizzled::session_id_t id)
00051 {
00052   char stack_dummy;
00053   boost::this_thread::disable_interruption disable_by_default;
00054 
00055   Session::shared_ptr session(session::Cache::singleton().find(id));
00056 
00057   try
00058   {
00059 
00060     if (not session)
00061     {
00062       std::cerr << _("Session killed before thread could execute") << endl;
00063       return;
00064     }
00065     session->pushInterrupt(&disable_by_default);
00066 
00067     if (drizzled::internal::my_thread_init())
00068     {
00069       session->disconnect(drizzled::ER_OUT_OF_RESOURCES);
00070       session->status_var.aborted_connects++;
00071     }
00072     else
00073     {
00074       boost::this_thread::at_thread_exit(&internal::my_thread_end);
00075 
00076       session->thread_stack= (char*) &stack_dummy;
00077       session->run();
00078     }
00079 
00080     killSessionNow(session);
00081   }
00082   catch (abort_exception& ex)
00083   {
00084     cout << _("Drizzle has receieved an abort event.") << endl;
00085     cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
00086     cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
00087     cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
00088 
00089     TransactionServices::singleton().sendShutdownEvent(*session.get());
00090   }
00091   // @todo remove hard spin by disconnection the session first from the
00092   // thread.
00093   while (not session.unique()) {}
00094 }
00095 
00096 void MultiThreadScheduler::setStackSize()
00097 {
00098   pthread_attr_t attr;
00099 
00100   (void) pthread_attr_init(&attr);
00101 
00102   /* Get the thread stack size that the OS will use and make sure
00103     that we update our global variable. */
00104   int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
00105   pthread_attr_destroy(&attr);
00106 
00107   if (err != 0)
00108   {
00109     errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
00110     my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
00111   }
00112 
00113   if (my_thread_stack_size == 0)
00114   {
00115     my_thread_stack_size= 524288; // At the time of the writing of this code, this was OSX's
00116   }
00117 #ifdef __sun
00118   /*
00119    * Solaris will return zero for the stack size in a call to
00120    * pthread_attr_getstacksize() to indicate that the OS default stack
00121    * size is used. We need an actual value in my_thread_stack_size so that
00122    * check_stack_overrun() will work. The Solaris man page for the
00123    * pthread_attr_getstacksize() function says that 2M is used for 64-bit
00124    * processes. We'll explicitly set it here to make sure that is what
00125    * will be used.
00126    */
00127   if (my_thread_stack_size == 0)
00128   {
00129     my_thread_stack_size= 2 * 1024 * 1024;
00130   }
00131 #endif
00132 }
00133 
00134 bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
00135 {
00136   if (thread_count >= max_threads)
00137     return true;
00138 
00139   thread_count.increment();
00140   try
00141   {
00142     session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
00143   }
00144   catch (std::exception&)
00145   {
00146     thread_count.decrement();
00147     return true;
00148   }
00149 
00150   if (not session->getThread())
00151   {
00152     thread_count.decrement();
00153     return true;
00154   }
00155 
00156   if (not session->getThread()->joinable())
00157   {
00158     thread_count.decrement();
00159     return true;
00160   }
00161 
00162   return false;
00163 }
00164 
00165 
00166 void MultiThreadScheduler::killSession(Session *session)
00167 {
00168   boost_thread_shared_ptr thread(session->getThread());
00169 
00170   if (thread)
00171   {
00172     thread->interrupt();
00173   }
00174 }
00175 
00176 void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
00177 {
00178   killSession(session.get());
00179 
00180   session->disconnect();
00181 
00182   /* Locks LOCK_thread_count and deletes session */
00183   Session::unlink(session);
00184   thread_count.decrement();
00185 }
00186 
00187 MultiThreadScheduler::~MultiThreadScheduler()
00188 {
00189   boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
00190   while (thread_count)
00191   {
00192     COND_thread_count.wait(scopedLock);
00193   }
00194 }
00195 
00196 } // multi_thread namespace
00197 
00198   
00199 static int init(drizzled::module::Context &context)
00200 {
00201   
00202   context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
00203 
00204   return 0;
00205 }
00206 
00207 static void init_options(drizzled::module::option_context &context)
00208 {
00209   context("max-threads",
00210           po::value<max_threads_constraint>(&max_threads)->default_value(2048),
00211           _("Maximum number of user threads available."));
00212 }
00213 
00214 DRIZZLE_DECLARE_PLUGIN
00215 {
00216   DRIZZLE_VERSION_ID,
00217   "multi_thread",
00218   "0.1",
00219   "Brian Aker",
00220   "One Thread Per Session Scheduler",
00221   PLUGIN_LICENSE_GPL,
00222   init, /* Plugin Init */
00223   NULL,   /* depends */
00224   init_options    /* config options */
00225 }
00226 DRIZZLE_DECLARE_PLUGIN_END;