Drizzled Public API Documentation

CSThread.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh (H&G2JCtL)
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-05-20
00023  *
00024  * CORE SYSTEM:
00025  * A independently running thread.
00026  *
00027  */
00028 
00029 #include "CSConfig.h"
00030 
00031 #ifdef OS_WINDOWS
00032 #include <signal.h>
00033 //#include "uniwin.h"
00034 #define SIGUSR1 30
00035 #define SIGUSR2 31
00036 
00037 #else
00038 #include <signal.h>
00039 #include <sys/signal.h>
00040 #include <unistd.h>
00041 #endif
00042 #include <errno.h>
00043 
00044 #include "CSGlobal.h"
00045 #include "CSLog.h"
00046 #include "CSException.h"
00047 #include "CSThread.h"
00048 #include "CSStrUtil.h"
00049 #include "CSMemory.h"
00050 
00051 #define PBMS_THREAD_SIG SIGUSR1
00052 /*
00053  * ---------------------------------------------------------------
00054  * SIGNAL HANDLERS
00055  */
00056 
00057 extern "C" {
00058 
00059 
00060 static void td_catch_signal(int sig)
00061 {
00062   CSThread *self;
00063 
00064   if ((self = CSThread::getSelf())) {
00065     if (self->isMain()) {
00066       /* The main thread will pass on a signal to all threads: */
00067       if (self->myThreadList)
00068         self->myThreadList->signalAllThreads(sig);
00069       self->setSignalPending(sig);
00070     }
00071   }
00072   
00073 }
00074 
00075 static  void td_throw_signal(int sig)
00076 {
00077   CSThread *self;
00078 
00079   if ((self = CSThread::getSelf())) {
00080     if (self->isMain()) {
00081       /* The main thread will pass on a signal to all threads: */
00082       if (self->myThreadList)
00083         self->myThreadList->signalAllThreads(sig);
00084     }
00085     self->setSignalPending(sig);
00086     self->interrupted();
00087   }
00088 }
00089 
00090 static bool td_setup_signals(CSThread *thread)
00091 {
00092 #ifdef OS_WINDOWS
00093   return true;
00094 #else
00095   struct sigaction action;
00096 
00097     sigemptyset(&action.sa_mask);
00098     action.sa_flags = 0;
00099 
00100     action.sa_handler = td_catch_signal;
00101 
00102   if (sigaction(PBMS_THREAD_SIG, &action, NULL) == -1)
00103     goto error_occurred;
00104 
00105     action.sa_handler = td_throw_signal;
00106 
00107   return true;
00108 
00109   error_occurred:
00110 
00111   if (thread) {
00112     thread->myException.initOSError(CS_CONTEXT, errno);
00113     thread->myException.setStackTrace(thread);
00114   }
00115   else
00116     CSException::throwOSError(CS_CONTEXT, errno);
00117   return false;
00118 #endif
00119 }
00120 
00121 }
00122 
00123 /*
00124  * ---------------------------------------------------------------
00125  * THREAD LISTS
00126  */
00127 
00128 void CSThreadList::signalAllThreads(int sig)
00129 {
00130   CSThread *ptr;
00131 
00132   enter_();
00133   lock_(this);
00134   ptr = (CSThread *) getBack();
00135   while (ptr) {
00136     if (ptr != self)
00137       ptr->signal(sig);
00138     ptr = (CSThread *) ptr->getNextLink();
00139   }
00140   unlock_(this);
00141 
00142   exit_();
00143 }
00144 
00145 void CSThreadList::quitAllThreads()
00146 {
00147   CSThread *ptr;
00148 
00149   enter_();
00150   lock_(this);
00151   
00152   ptr = (CSThread *) getBack();
00153   while (ptr) {
00154     if (ptr != self)
00155       ptr->myMustQuit = true;
00156     ptr = (CSThread *) ptr->getNextLink();
00157   }
00158   
00159   unlock_(this);
00160   exit_();
00161 }
00162 
00163 void CSThreadList::stopAllThreads()
00164 {
00165   CSThread *thread;
00166 
00167   enter_();
00168   for (;;) {
00169     /* Get a thread that is not self! */
00170     lock_(this);
00171     if ((thread = (CSThread *) getBack())) {
00172       while (thread) {
00173         if (thread != self)
00174           break;
00175         thread = (CSThread *) thread->getNextLink();
00176       }
00177     }
00178     if (thread)
00179       thread->retain();
00180     unlock_(this);
00181     
00182     if (!thread)
00183       break;
00184       
00185     push_(thread);
00186     thread->stop();
00187     release_(thread);
00188   }
00189   exit_();
00190 }
00191 
00192 /*
00193  * ---------------------------------------------------------------
00194  * CSTHREAD
00195  */
00196 
00197 void CSThread::addToList()
00198 {
00199   if (myThreadList) {
00200     enter_();
00201     ASSERT(self == this);
00202     lock_(myThreadList);
00203     myThreadList->addFront(self);
00204     isRunning = true;
00205     unlock_(myThreadList);
00206     exit_();
00207   }
00208   else
00209     isRunning = true;
00210 }
00211   
00212 void CSThread::removeFromList()
00213 {
00214   if (myThreadList && isRunning) {
00215     CSThread *myself = this; // pop_() wants to take a reference to its parameter.
00216     enter_();
00217     /* Retain the thread in order to ensure
00218      * that after it is removed from the list,
00219      * that it is not freed! This would make the
00220      * unlock_() call invalid, because it requires
00221      * on the thread.
00222      */
00223     push_(myself);
00224     lock_(myThreadList);
00225     myThreadList->remove(RETAIN(myself));
00226     unlock_(myThreadList);
00227     pop_(myself);
00228     outer_();
00229   }
00230   this->release();
00231 }
00232 
00233 void *CSThread::dispatch(void *arg)
00234 {
00235   CSThread    *self;
00236   void      *return_data = NULL;
00237   int       err;
00238 
00239   /* Get a reference to myself: */
00240   self = reinterpret_cast<CSThread*>(arg);
00241   ASSERT(self);
00242 
00243   /* Store my thread in the thread key: */
00244   if ((err = pthread_setspecific(CSThread::sThreadKey, self))) {
00245     CSException::logOSError(self, CS_CONTEXT, err);
00246     return NULL;
00247   }
00248 
00249   /*
00250    * Make sure the thread is not freed while we
00251    * are running:
00252    */
00253   self->retain();
00254 
00255   try_(a) {
00256     td_setup_signals(NULL);
00257 
00258     /* Add the thread to the list: */
00259     self->addToList();
00260 
00261     // Run the task from the correct context
00262     return_data = self->run();
00263   }
00264   catch_(a) {
00265     self->logException();
00266   }
00267   cont_(a);
00268 
00269   /*
00270    * Removing from the thread list will also release the thread.
00271    */
00272   self->removeFromList();
00273 
00274   // Exit the thread
00275   return return_data;
00276 }
00277 
00278 
00279 extern "C"
00280 {
00281 
00282 static void *dispatch_wrapper(void *arg)
00283 {
00284   return CSThread::dispatch(arg);
00285 }
00286 
00287 }
00288 
00289 void *CSThread::run()
00290 {
00291   if (iRunFunc)
00292     return iRunFunc();
00293   return NULL;
00294 }
00295 
00296 void CSThread::start(bool detached)
00297 {
00298   int err;
00299 
00300   err = pthread_create(&iThread, NULL, dispatch_wrapper, (void *) this);
00301   if (err)
00302     CSException::throwOSError(CS_CONTEXT, err);
00303   while (!isRunning) {
00304     /* Check if the thread is still alive,
00305      * so we don't hang forever.
00306      */
00307     if (pthread_kill(iThread, 0))
00308       break;
00309     usleep(10);
00310   }
00311   
00312   isDetached = detached;
00313   if (detached)
00314     pthread_detach(iThread);
00315 }
00316 
00317 void CSThread::stop()
00318 {
00319   signal(SIGTERM);
00320   join();
00321 }
00322 
00323 void *CSThread::join()
00324 {
00325   void  *return_data = NULL;
00326   int   err;
00327 
00328   enter_();
00329   if (isDetached) {
00330     while (isRunning && !pthread_kill(iThread, 0)) 
00331       usleep(100);
00332   } else if ((err = pthread_join(iThread, &return_data))) {
00333     CSException::throwOSError(CS_CONTEXT, err);
00334   }
00335 
00336   return_(return_data);
00337 }
00338 
00339 void CSThread::setSignalPending(unsigned int sig)
00340 {
00341   if (sig == SIGTERM)
00342     /* The terminate signal takes priority: */
00343     signalPending = SIGTERM;
00344   else if (!signalPending)
00345     /* Otherwise, first signal wins... */
00346     signalPending = sig;
00347 }
00348 
00349 void CSThread::signal(unsigned int sig)
00350 {
00351 #ifndef OS_WINDOWS // Currently you cannot signal threads on windows.
00352   int err;
00353 
00354   setSignalPending(sig);
00355   if ((err = pthread_kill(iThread, PBMS_THREAD_SIG)))
00356   {
00357     /* Ignore the error if the process does not exist! */
00358     if (err != ESRCH) /* No such process */
00359       CSException::throwOSError(CS_CONTEXT, err);
00360   }
00361 #endif
00362 }
00363 
00364 void CSThread::throwSignal()
00365 {
00366   int sig;
00367 
00368   if ((sig = signalPending) && !ignoreSignals) {
00369     signalPending = 0;
00370     CSException::throwSignal(CS_CONTEXT, sig);
00371   }
00372 }
00373 
00374 bool CSThread::isMain()
00375 {
00376   return iIsMain;
00377 }
00378 
00379 /*
00380  * -----------------------------------------------------------------------
00381  * THROWING EXCEPTIONS
00382  */
00383 
00384 /* 
00385  * When an exception is .
00386  */
00387 
00388 void CSThread::releaseObjects(CSReleasePtr top)
00389 {
00390   CSObject *obj;
00391 
00392   while (relTop > top) {
00393     /* Remove and release or unlock the object on the top of the stack: */
00394     relTop--;
00395     switch(relTop->r_type) {
00396       case CS_RELEASE_OBJECT:
00397         if ((obj = relTop->x.r_object))
00398           obj->release();
00399         break;
00400       case CS_RELEASE_MUTEX:
00401         if (relTop->x.r_mutex)
00402           relTop->x.r_mutex->unlock();
00403         break;
00404       case CS_RELEASE_POOLED:
00405         if (relTop->x.r_pooled)
00406           relTop->x.r_pooled->returnToPool();
00407         break;
00408       case CS_RELEASE_MEM:
00409         if (relTop->x.r_mem)
00410           cs_free(relTop->x.r_mem);
00411         break;
00412       case CS_RELEASE_OBJECT_PTR:
00413         if ((relTop->x.r_objectPtr) && (obj = *(relTop->x.r_objectPtr)))
00414           obj->release();
00415         break;
00416     }
00417   }
00418 }
00419 
00420 /* Throw an already registered error: */
00421 void CSThread::throwException()
00422 {
00423   /* Record the stack trace: */
00424   if (this->jumpDepth > 0 && this->jumpDepth <= CS_JUMP_STACK_SIZE) {
00425     /*
00426      * As recommended by Barry:
00427      * release the objects before we jump!
00428      * This has the advantage that the stack context is still
00429      * valid when the resources are released.
00430      */
00431     releaseObjects(this->jumpEnv[this->jumpDepth-1].jb_res_top);
00432 
00433     /* Then do the longjmp: */
00434     longjmp(this->jumpEnv[this->jumpDepth-1].jb_buffer, 1);
00435   }
00436 }
00437 
00438 void CSThread::logStack(int depth, const char *msg)
00439 {
00440   char buffer[CS_EXC_CONTEXT_SIZE +1];
00441   CSL.lock();
00442   CSL.log(this, CSLog::Trace, msg);
00443   
00444   for (int i= callTop-1; i>=0 && depth; i--, depth--) {
00445     cs_format_context(CS_EXC_CONTEXT_SIZE, buffer,
00446       callStack[i].cs_func, callStack[i].cs_file, callStack[i].cs_line);
00447     strcat(buffer, "\n");
00448     CSL.log(this, CSLog::Trace, buffer);
00449   }
00450   CSL.unlock();
00451 }
00452 
00453 void CSThread::logException()
00454 {
00455   myException.log(this);
00456 }
00457 
00458 /*
00459  * This function is called when an exception is caught.
00460  * It restores the function call top and frees
00461  * any resource allocated by lower levels.
00462  */
00463 void CSThread::caught()
00464 {
00465   /* Restore the call top: */
00466   this->callTop = this->jumpEnv[this->jumpDepth].jb_call_top;
00467 
00468   /* 
00469    * Release all all objects that were pushed after
00470    * this jump position was set:
00471    */
00472   releaseObjects(this->jumpEnv[this->jumpDepth].jb_res_top);
00473 }
00474 
00475 /*
00476  * ---------------------------------------------------------------
00477  * STATIC METHODS
00478  */
00479 
00480 pthread_key_t CSThread::sThreadKey;
00481 bool      CSThread::isUp = false;
00482 
00483 bool CSThread::startUp()
00484 {
00485   int err;
00486 
00487   isUp = false;
00488   if ((err = pthread_key_create(&sThreadKey, NULL))) {
00489     CSException::logOSError(CS_CONTEXT, errno);
00490   } else
00491     isUp = true;
00492     
00493   return isUp;
00494 }
00495 
00496 void CSThread::shutDown()
00497 {
00498   isUp = false;
00499 }
00500 
00501 bool CSThread::attach(CSThread *thread)
00502 {
00503   ASSERT(!getSelf());
00504   
00505   if (!thread) {
00506     CSException::logOSError(CS_CONTEXT, ENOMEM);
00507     return false;
00508   }
00509 
00510   if (!setSelf(thread))
00511     return false;
00512 
00513   /* Now we are ready to receive signals: */
00514   if (!td_setup_signals(thread))
00515     return false;
00516 
00517   thread->addToList();
00518   thread->retain();
00519   return true;
00520 }
00521 
00522 void CSThread::detach(CSThread *thread)
00523 {
00524   ASSERT(!getSelf() || getSelf() == thread);
00525   thread->removeFromList();
00526   thread->release();
00527   pthread_setspecific(sThreadKey, NULL);
00528 }
00529 
00530 CSThread* CSThread::getSelf()
00531 {
00532   CSThread* self = NULL;
00533   
00534   if ((!isUp) || !(self = (CSThread*) pthread_getspecific(sThreadKey)))
00535     return (CSThread*) NULL;
00536     
00537 #ifdef DEBUG
00538   /* PMC - Problem is, if this is called when releasing a
00539    * thread, then we have the reference count equal to
00540    * zero.
00541   if (self && !self->iRefCount) {
00542     pthread_setspecific(sThreadKey, NULL);
00543     CSException::throwAssertion(CS_CONTEXT, "Freed self pointer referenced.");
00544   }
00545   */
00546 #endif
00547 
00548   return self;
00549 }
00550 
00551 bool CSThread::setSelf(CSThread *self)
00552 {
00553   int err;
00554 
00555   if (self) {
00556     self->iThread = pthread_self();
00557 
00558     /* Store my thread in the thread key: */
00559     if ((err = pthread_setspecific(sThreadKey, self))) {
00560       self->myException.initOSError(CS_CONTEXT, err);
00561       self->myException.setStackTrace(self);
00562       return false;
00563     }
00564   }
00565   else
00566     pthread_setspecific(sThreadKey, NULL);
00567   return true;
00568 }
00569 
00570 /* timeout is in milliseconds */
00571 void CSThread::sleep(unsigned long timeout)
00572 {
00573   enter_();
00574   usleep(timeout * 1000);
00575   self->interrupted();
00576   exit_();
00577 }
00578 
00579 #ifdef DEBUG
00580 int cs_assert(const char *func, const char *file, int line, const char *message)
00581 {
00582   CSException::throwAssertion(func, file, line, message);
00583   return 0;
00584 }
00585 
00586 int cs_hope(const char *func, const char *file, int line, const char *message)
00587 {
00588   CSException e;
00589     
00590   e.initAssertion(func, file, line, message);
00591   e.log(NULL);
00592   return 0;
00593 }
00594 #endif
00595 
00596 CSThread *CSThread::newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list)
00597 {
00598   CSThread *thd;
00599 
00600   enter_();
00601   if (!(thd = new CSThread(list))) {
00602     name->release();
00603     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00604   }
00605   thd->threadName = name;
00606   thd->iRunFunc = run_func;
00607   return_(thd);
00608 }
00609 
00610 CSThread *CSThread::newCSThread()
00611 {
00612   CSThread *thd = NULL;
00613 
00614   if (!(thd = new CSThread(NULL))) {
00615     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00616   }
00617   
00618   return thd;
00619 }
00620 
00621 /*
00622  * ---------------------------------------------------------------
00623  * DAEMON THREADS
00624  */
00625 
00626 CSDaemon::CSDaemon(time_t wait_time, CSThreadList *list):
00627 CSThread(list),
00628 CSSync(),
00629 myWaitTime(wait_time),
00630 iSuspended(false),
00631 iSuspendCount(0)
00632 {
00633 }
00634 
00635 CSDaemon::CSDaemon(CSThreadList *list):
00636 CSThread(list),
00637 CSSync(),
00638 myWaitTime(0),
00639 iSuspended(false),
00640 iSuspendCount(0)
00641 {
00642 }
00643 
00644 void CSDaemon::try_Run(CSThread *self, const bool c_must_sleep)
00645 {
00646   try_(a) {
00647      bool must_sleep = c_must_sleep; // This done to avoid longjmp() clobber.
00648     while (!myMustQuit) {
00649       if (must_sleep) {
00650         lock_(this);
00651         if (myWaitTime)
00652           suspendedWait(myWaitTime);
00653         else
00654           suspendedWait();
00655         unlock_(this);
00656         if (myMustQuit)
00657           break;
00658       }
00659       must_sleep = doWork();
00660     }
00661   }
00662   catch_(a) {
00663     if (!handleException())
00664       myMustQuit = true;
00665   }
00666   cont_(a);
00667 }
00668 
00669 void *CSDaemon::run()
00670 {
00671   bool must_sleep = false;
00672 
00673   enter_();
00674 
00675   myMustQuit = !initializeWork();
00676 
00677   while  (!myMustQuit) {
00678     try_Run(self, must_sleep);
00679     must_sleep = true;
00680   }
00681 
00682   /* Prevent signals from going off in completeWork! */
00683   ignoreSignals = true;
00684 
00685   return_(completeWork());
00686 }
00687 
00688 bool CSDaemon::doWork()
00689 {
00690   if (iRunFunc)
00691     (void) iRunFunc();
00692   return true;
00693 }
00694 
00695 bool CSDaemon::handleException()
00696 {
00697   if (!myMustQuit)
00698     logException();
00699   return true;
00700 }
00701 
00702 void CSDaemon::wakeup()
00703 {
00704   CSSync::wakeup();
00705 }
00706 
00707 void CSDaemon::stop()
00708 {
00709   myMustQuit = true;
00710   wakeup();
00711   signal(SIGTERM);
00712   join();
00713 }
00714 
00715 void CSDaemon::suspend()
00716 {
00717   enter_();
00718   lock_(this);
00719   iSuspendCount++;
00720   while (!iSuspended && !myMustQuit)
00721     wait(500);
00722   if (!iSuspended)
00723     iSuspendCount--;
00724   unlock_(this);
00725   exit_();
00726 }
00727 
00728 void CSDaemon::resume()
00729 {
00730   enter_();
00731   lock_(this);
00732   if (iSuspendCount > 0)
00733     iSuspendCount--;
00734   wakeup();
00735   unlock_(this);
00736   exit_();
00737 }
00738 
00739 void CSDaemon::suspended()
00740 {
00741   if (!iSuspendCount || myMustQuit) {
00742     iSuspended = false;
00743     return;
00744   }
00745   enter_();
00746   lock_(this);
00747   while (iSuspendCount && !myMustQuit) {
00748     iSuspended = true;
00749     wait(500);
00750   }
00751   iSuspended = false;
00752   unlock_(this);
00753   exit_();
00754 }
00755 
00756 void CSDaemon::suspendedWait()
00757 {
00758   iSuspended = true;
00759   wait();
00760   if (iSuspendCount)
00761     suspended();
00762 }
00763 
00764 void CSDaemon::suspendedWait(time_t milli_sec)
00765 {
00766   iSuspended = true;
00767   wait(milli_sec);
00768   if (iSuspendCount)
00769     suspended();
00770   else
00771     iSuspended = false;
00772 }
00773 
00774 /*
00775  * ---------------------------------------------------------------
00776  * THREAD POOLS
00777  */
00778 
00779