00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include <config.h>
00017
00018 #include <iostream>
00019
00020 #include <drizzled/pthread_globals.h>
00021 #include <drizzled/module/option_map.h>
00022 #include <drizzled/errmsg_print.h>
00023 #include <drizzled/session.h>
00024 #include <drizzled/session/cache.h>
00025 #include <drizzled/abort_exception.h>
00026 #include <drizzled/transaction_services.h>
00027 #include <drizzled/gettext.h>
00028
00029 #include <boost/thread.hpp>
00030 #include <boost/bind.hpp>
00031 #include <boost/program_options.hpp>
00032
00033 #include "multi_thread.h"
00034
00035 namespace po= boost::program_options;
00036 using namespace std;
00037 using namespace drizzled;
00038
00039
00040 typedef constrained_check<uint32_t, 4096, 1> max_threads_constraint;
00041 static max_threads_constraint max_threads;
00042
00043 namespace drizzled
00044 {
00045 extern size_t my_thread_stack_size;
00046 }
00047
00048 namespace multi_thread {
00049
00050 void MultiThreadScheduler::runSession(drizzled::session_id_t id)
00051 {
00052 char stack_dummy;
00053 boost::this_thread::disable_interruption disable_by_default;
00054
00055 Session::shared_ptr session(session::Cache::singleton().find(id));
00056
00057 try
00058 {
00059
00060 if (not session)
00061 {
00062 std::cerr << _("Session killed before thread could execute") << endl;
00063 return;
00064 }
00065 session->pushInterrupt(&disable_by_default);
00066
00067 if (drizzled::internal::my_thread_init())
00068 {
00069 session->disconnect(drizzled::ER_OUT_OF_RESOURCES);
00070 session->status_var.aborted_connects++;
00071 }
00072 else
00073 {
00074 boost::this_thread::at_thread_exit(&internal::my_thread_end);
00075
00076 session->thread_stack= (char*) &stack_dummy;
00077 session->run();
00078 }
00079
00080 killSessionNow(session);
00081 }
00082 catch (abort_exception& ex)
00083 {
00084 cout << _("Drizzle has receieved an abort event.") << endl;
00085 cout << _("In Function: ") << *::boost::get_error_info<boost::throw_function>(ex) << endl;
00086 cout << _("In File: ") << *::boost::get_error_info<boost::throw_file>(ex) << endl;
00087 cout << _("On Line: ") << *::boost::get_error_info<boost::throw_line>(ex) << endl;
00088
00089 TransactionServices::singleton().sendShutdownEvent(*session.get());
00090 }
00091
00092
00093 while (not session.unique()) {}
00094 }
00095
00096 void MultiThreadScheduler::setStackSize()
00097 {
00098 pthread_attr_t attr;
00099
00100 (void) pthread_attr_init(&attr);
00101
00102
00103
00104 int err= pthread_attr_getstacksize(&attr, &my_thread_stack_size);
00105 pthread_attr_destroy(&attr);
00106
00107 if (err != 0)
00108 {
00109 errmsg_printf(error::ERROR, _("Unable to get thread stack size"));
00110 my_thread_stack_size= 524288;
00111 }
00112
00113 if (my_thread_stack_size == 0)
00114 {
00115 my_thread_stack_size= 524288;
00116 }
00117 #ifdef __sun
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 if (my_thread_stack_size == 0)
00128 {
00129 my_thread_stack_size= 2 * 1024 * 1024;
00130 }
00131 #endif
00132 }
00133
00134 bool MultiThreadScheduler::addSession(Session::shared_ptr &session)
00135 {
00136 if (thread_count >= max_threads)
00137 return true;
00138
00139 thread_count.increment();
00140 try
00141 {
00142 session->getThread().reset(new boost::thread((boost::bind(&MultiThreadScheduler::runSession, this, session->getSessionId()))));
00143 }
00144 catch (std::exception&)
00145 {
00146 thread_count.decrement();
00147 return true;
00148 }
00149
00150 if (not session->getThread())
00151 {
00152 thread_count.decrement();
00153 return true;
00154 }
00155
00156 if (not session->getThread()->joinable())
00157 {
00158 thread_count.decrement();
00159 return true;
00160 }
00161
00162 return false;
00163 }
00164
00165
00166 void MultiThreadScheduler::killSession(Session *session)
00167 {
00168 boost_thread_shared_ptr thread(session->getThread());
00169
00170 if (thread)
00171 {
00172 thread->interrupt();
00173 }
00174 }
00175
00176 void MultiThreadScheduler::killSessionNow(Session::shared_ptr &session)
00177 {
00178 killSession(session.get());
00179
00180 session->disconnect();
00181
00182
00183 Session::unlink(session);
00184 thread_count.decrement();
00185 }
00186
00187 MultiThreadScheduler::~MultiThreadScheduler()
00188 {
00189 boost::mutex::scoped_lock scopedLock(drizzled::session::Cache::singleton().mutex());
00190 while (thread_count)
00191 {
00192 COND_thread_count.wait(scopedLock);
00193 }
00194 }
00195
00196 }
00197
00198
00199 static int init(drizzled::module::Context &context)
00200 {
00201
00202 context.add(new multi_thread::MultiThreadScheduler("multi_thread"));
00203
00204 return 0;
00205 }
00206
00207 static void init_options(drizzled::module::option_context &context)
00208 {
00209 context("max-threads",
00210 po::value<max_threads_constraint>(&max_threads)->default_value(2048),
00211 _("Maximum number of user threads available."));
00212 }
00213
00214 DRIZZLE_DECLARE_PLUGIN
00215 {
00216 DRIZZLE_VERSION_ID,
00217 "multi_thread",
00218 "0.1",
00219 "Brian Aker",
00220 "One Thread Per Session Scheduler",
00221 PLUGIN_LICENSE_GPL,
00222 init,
00223 NULL,
00224 init_options
00225 }
00226 DRIZZLE_DECLARE_PLUGIN_END;