Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __CS_CSUTIL_THREADJOBQUEUE_H__
00022 #define __CS_CSUTIL_THREADJOBQUEUE_H__
00023
00028 #include "csextern.h"
00029 #include "csutil/fifo.h"
00030 #include "csutil/scf_implementation.h"
00031 #include "csutil/csstring.h"
00032 #include "iutil/job.h"
00033
00034 #include "csutil/threading/condition.h"
00035 #include "csutil/threading/mutex.h"
00036 #include "csutil/threading/thread.h"
00037
00038 namespace CS
00039 {
00040 namespace Threading
00041 {
00042
00043 class CS_CRYSTALSPACE_EXPORT ThreadedJobQueue :
00044 public scfImplementation1<ThreadedJobQueue, iJobQueue>
00045 {
00046 public:
00055 ThreadedJobQueue (size_t numWorkers = 1, ThreadPriority priority = THREAD_PRIO_NORMAL,
00056 const char* name = 0);
00057 virtual ~ThreadedJobQueue ();
00058
00059 virtual void Enqueue (iJob* job);
00060 virtual JobStatus Dequeue (iJob* job, bool waitForCompletion);
00061 virtual JobStatus PullAndRun (iJob* job, bool waitForCompletion = true);
00062 virtual bool IsFinished ();
00063 virtual int32 GetQueueCount();
00064 virtual void WaitAll ();
00065
00067 const char* GetName () const { return name; }
00068 private:
00069
00070 bool PullFromQueues (iJob* job);
00071 JobStatus CheckCompletion (iJob* job, bool waitForCompletion);
00072
00073
00074 struct ThreadState;
00075
00076 class QueueRunnable : public Runnable
00077 {
00078 public:
00079 QueueRunnable (ThreadedJobQueue* queue, ThreadState* ts, unsigned int id);
00080
00081 virtual void Run ();
00082 virtual const char* GetName () const;
00083 private:
00084 friend class ThreadedJobQueue;
00085
00086 ThreadedJobQueue* ownerQueue;
00087 int32 shutdownQueue;
00088 csRef<ThreadState> threadState;
00089 csString name;
00090 };
00091
00092
00093 struct ThreadState : public CS::Utility::AtomicRefCount
00094 {
00095 ThreadState (ThreadedJobQueue* queue, unsigned int id)
00096 {
00097 runnable.AttachNew (new QueueRunnable (queue, this, id));
00098 threadObject.AttachNew (new Thread (runnable, false));
00099 }
00100
00101 csRef<QueueRunnable> runnable;
00102 csRef<Thread> threadObject;
00103 csRef<iJob> currentJob;
00104
00105
00106 Mutex tsMutex;
00107 Condition tsNewJob;
00108 Condition tsJobFinished;
00109
00110 csFIFO<csRef<iJob> > jobQueue;
00111 };
00112
00113 csRef<ThreadState>* allThreadState;
00114 ThreadGroup allThreads;
00115
00116 Mutex finishMutex;
00117
00118 size_t numWorkerThreads;
00119 int32 outstandingJobs;
00120 csString name;
00121 };
00122
00123 }
00124 }
00125
00126 typedef CS::Threading::ThreadedJobQueue csThreadJobQueue;
00127
00128
00129 #endif // __CS_CSUTIL_THREADJOBQUEUE_H__