As far as C++ multithreaded programming is concerned, an active object is an instance of a class which internally creates and manages its own thread or threads. The active object must handle all necessary synchronization with the threads it creates, because the threads are not visible to the clients of the object. Essentially, an active object is like a little program of its own, except that it lives in the address space of an application.
Active objects can be a very handy mechanism for encapsulating all sorts of tasks. For example, an object can be designed as a job queue processor. The containing application can post jobs to the active object, which will use its own thread to read jobs off the queue and process those jobs. This concept can be extended even further by having the active object create and manage a collection of threads to use for processing jobs. This turns the active object into a multithreaded job server wrapped in a C++ class.
Another use for active objects is for executing code that needs to be run at regular intervals. The internal thread can regularly execute the necessary code with almost no dependence upon the activities of the rest of the application. Under old style Win16 code, programmers usually accomplished this by creating a hidden window and regularly posting messages to it from elsewhere in the code or with a timer. The processing was done in the handler for the posted message. This approach has two drawbacks. First, creating and posting messages to a hidden window has overhead which may be inappropriate for the task. Second, since messages are processed in order, the code which needs to be regularly executed will be run at irregular intervals, subject to the vagaries of the message processing rate of the rest of the program. A long operation elsewhere in the program will cause the code which is supposed to run at regular intervals to execute late. This can be a real problem when irregular timing is visible to the user, such as when used for animation which uses regularly executed code to paint sequential frames to give the illusion of motion.
Yet another use for active objects is to make slow operations asynchronous. With single threaded programming, a slow operation, such as initializing a hardware device, can prevent the rest of the program from executing until the device initialization is complete. With multithreading and object oriented programming, a C++ object can encapsulate the functionality of the device and an internal thread can be responsible for initializing the hardware, perhaps acquiring an internal critical section when it begins, and releasing the critical section when it completes. Any other member functions that access the hardware can also acquire and release this critical section. What this accomplishes is asynchronous initialization of the hardware. A client program can create the active object, call the Initialize member function, and go off an initialize the rest of the program. When the program actually gets around to using the device by calling device specific member functions, the client will be blocked by the critical section until the device is initialized. This technique can be used with multiple devices to essentially perform all slow program initialization in parallel, significantly speeding up the start up times of your software.
The design of an active object is largely determined by the problem being solved and the efficiencies required. Some active objects need to be started and stopped by the client. While it may not be necessary to have explicit Start and Stop functions, many times some of the member functions provided by the active object for the client will implicitly activate and deactivate the object. Those functions are good places to create and destroy the active object’s internal threads.
Other kinds of active objects may need a thread or threads to exist throughout the lifetime of the object. Still other objects may dynamically create or destroy threads based on some transient internal object state. The tradeoffs are largely a matter of comparing the overhead of thread creation to the cost of keeping created threads around until you need them. You can always block a created thread on an unsignaled kernel object until the thread has something to do, but sometimes this is inconvenient. It is important to understand the run-time characteristics of an active object to determine the most efficient design for a particular task.
To illustrate the alternatives in developing an active object, we have implemented two designs for an active object. The active object is a Deferred Processing Queue, or DPQ. This object combines features of a job queue with an internal thread to process submitted jobs asynchronously with respect to the client. To use a DPQ, you create an instance of the
CJobList class, and use the QueueJob member function to submit jobs to the object for processing. Each job is an instance of a class derived from CJob. CJob is an abstract base class whose children must implement the Execute and Cancel member functions. The DPQ retrieves the object from its internal queue and invokes the object’s Execute member function. The Cancel function is invoked when a submitted job is removed from the queue but not executed, for example, if the client removes the job by using the CancelJob member function. Internally the DPQ manages its own worker thread which takes jobs off of its internal queue and executes them. The DPQ is a very general kind of active object and can be used for many kinds of asynchronous processing. All of the CJobs submitted to an instance of a DPQ don’t even have to be of the same type!Figure 9-1. Graphical representation of a deferred processing queue (DPQ).
We’ll be using the well-worn example of a print queue so that we can concentrate on the details of the implementation of the DPQ and not worry too much about the actual work done by the individual jobs. The first implementation of a DPQ will create an internal worker thread when the Start method of the DPQ is invoked and destroy the thread when the Stop method is called. Example 9-2 is the implementation file for the first version of a DPQ.
Example 9-2. DPQ1.cpp, the first example of a deferred processing queue active object
#include <stdio.h>
#include <string.h>
#include "CMcl.h"
class CJob {
// this class has only pure virtual functions
// and must be derived from to use with the
// CJobList object...
public:
// execute method will be called by the
// deferred processing queue to process
// this job...
virtual void Execute(void) = 0;
// cancel method will be called by the
// deferred processing queue to process
// this job...
virtual void Cancel(void) = 0;
};
class CJobList : private CMclLinkedList<CJob *> {
public:
BOOL QueueJob( CJob *pJob) {
return PutOnTailOfList(pJob);
};
BOOL DequeueJob( CJob **pJob, CMclEvent *pInterrupt = NULL, DWORD dwTimeout = INFINITE) {
return GetFromHeadOfList( *pJob, dwTimeout, pInterrupt);
};
BOOL RemoveJob( CJob *pJob) {
// decrease the semaphore count...
// if the wait fails, there are no jobs in the queue...
if (CMclWaitTimeout(m_csNotEmpty.Wait(0))) {
return FALSE;
}
// acquire the critical section lock...
CMclAutoLock autoLock(m_cCritSec);
// search the list for a matching job...
CMclLinkedListNode *pNode = m_MasterNode.m_pNext;
while (pNode != &m_MasterNode) {
// since pNode is not the master node, we know that
// it points to a data node, so this cast is safe...
if ((static_cast<CMclLinkedListDataNode *>(pNode))->m_data == pJob)
break;
}
// if we found a match, remove it...
if (pNode != &m_MasterNode) {
// remove the node...
pNode->m_pPrev->m_pNext = pNode->m_pNext;
pNode->m_pNext->m_pPrev = pNode->m_pPrev;
// add the list node to the free list...
AddToFreeList(pNode);
// return TRUE when we remove a node...
return TRUE;
}
else {
// return FALSE when there is no matching
// node to remove, we need to bump the semaphore
// count back up...
m_csNotEmpty.Release(1);
return FALSE;
}
};
};
class CDPQueue1 {
private:
class CDPWorkerThreadHandler : public CMclThreadHandler {
private:
CJobList *m_pcJobList; // pointer to job list passed from owner DPQ
CMclEvent m_ceControl; // auto-reset event
BOOL m_bRun; // should the worker thread continue?
public:
CDPWorkerThreadHandler(CJobList *pcJobList) {
m_pcJobList = pcJobList;
m_bRun = TRUE;
};
void Stop(void) {
m_bRun = FALSE;
m_ceControl.Set();
};
unsigned ThreadHandlerProc(void) {
// m_bRun will allow the handler is exit even if there is always
// work in the queue...
CJob *pJob;
while (m_pcJobList->DequeueJob( &pJob, &m_ceControl, INFINITE)) {
pJob->Execute();
if (!m_bRun)
break;
}
return 0;
};
};
private:
CJobList m_cJobList; // list of jobs to process
CMclThreadAutoPtr m_apWorkerThread; // worker thread auto-pointer
CDPWorkerThreadHandler m_dpHandler; // thread handler object for worker thread
public:
CDPQueue1() : m_dpHandler( &m_cJobList) {
return;
};
BOOL Start(void) {
// check for redundant start...
if (m_apWorkerThread.IsNull() == FALSE) {
return FALSE;
}
// create the worker thread...
m_apWorkerThread = new CMclThread( &m_dpHandler);
printf("DPQ1 thread created.\n");
return TRUE;
};
BOOL Stop(void) {
// check for redundant stop...
if (m_apWorkerThread.IsNull() == TRUE) {
return FALSE;
}
// signal the worker thread to stop and wait
// for it to exit...
m_dpHandler.Stop();
m_apWorkerThread->Wait(INFINITE);
printf("DPQ1 thread exited.\n");
return TRUE;
};
BOOL QueueJob( CJob *pJob) {
// add the job to the queue...
return m_cJobList.QueueJob(pJob);
};
BOOL CancelJob( CJob *pJob) {
// remove a particular job from the queue...
if (m_cJobList.RemoveJob(pJob) == TRUE) {
pJob->Cancel();
return TRUE;
}
else {
return FALSE;
}
};
void CancelAllJobs(void) {
// remove all jobs from the queue...
CJob *pJob;
while (m_cJobList.DequeueJob( &pJob, NULL, 0)) {
pJob->Cancel();
}
};
};
class CPrintJob : public CJob {
private:
TCHAR *m_lpString;
BOOL m_bInUse;
public:
// constructor...
CPrintJob(LPCTSTR lpString) {
// initialize the print job...
m_lpString = NULL;
if (lpString) {
m_lpString = new TCHAR[strlen(lpString) + 1];
strcpy( m_lpString, lpString);
}
};
// pure virtual destructor...
virtual ~CPrintJob() {
if (m_lpString) {
delete [] m_lpString;
}
};
void Execute(void) {
// execute the print job...
if (m_lpString) {
printf( "Executing job <%s>.\n", m_lpString);
}
// make the job take a small amount of time...
Sleep(50);
// jobs delete themselves when complete...
delete this;
};
void Cancel(void) {
// cancel the print job...
if (m_lpString) {
printf( "Canceling job <%s>.\n", m_lpString);
}
// jobs delete themselves when canceled...
delete this;
};
};
int main( int argc, char *argv[]) {
CDPQueue1 dpq;
CPrintJob *pPrintJob;
TCHAR string[32];
// start the deferred processing queue...
dpq.Start();
// post 100 jobs at irregular intervals
// between 1 and 100 milliseconds...
for (int index = 0; index < 100; index++) {
Sleep((rand() % 99) + 1);
sprintf( string, "This is job %d", index);
pPrintJob = new CPrintJob(string);
dpq.QueueJob(pPrintJob);
}
// stop the deferred processing queue...
dpq.Stop();
// cancel any jobs remaining...
dpq.CancelAllJobs();
return 0;
}
The first thing done in DPQ1.cpp is to declare the
CJob abstract base class. Any job objects which the DPQ will process has to be derived from CJob. There are no data members.The next class,
CJobList, is very important. It is a derived from the Mcl library template class CMclLinkedLists, which provides thread synchronized queues. In this case, we need a class which will allow multiple threads to add and remove pointers to CJob objects simultaneously. We could almost use the CMclLinkedLists<CJob *> class directly, but we want to provide a method of removing particular jobs from the queue, given that a client has kept a pointer to a submitted CJob. In order to do this, we needed to add the RemoveJob function. Since the internal member objects of the CMclLinkedList class were all declared protected, derived classes such as CJobList are free to manipulate the internal data structures as necessary, so we can use the internal m_MasterNode to get the pointer to the first element in the list, and run over the entire list, looking for a node whose data member is equal to the CJob pointer we wish to remove. When we find the correct node, we remove it from the doubly linked list by pointing the previous node to the next node and vice-versa. Because we are manipulating the internal list state and not just using the public member functions, we need to acquire the list critical section object, m_cCritSec, and correctly manipulate the m_csNotEmpty semaphore, which the list object uses to make threads calling the Get functions to wait until there is an item in the list.The actual
CDPQueue1 class is defined next. When the internal worker thread is constructed, it needs to be given an thread handler object, so the first thing that the CDPQueue1 class does is define and implement the CDPWorkerThreadHandler. The thread handler includes the ThreadHandlerProc that will be the main loop of the worker thread, and a Stop function which is used by the CDPQueue1 class to tell the worker thread to exit. The ThreadHandlerProc simply runs in a loop waiting infinitely for a job to appear on the queue or the m_ceControl event to become signaled. If a job appears in the queue, its pointer is copied into pJob, and the DequeueJob function returns TRUE. If the function returns because the m_ceControl event was signaled, DequeueJob returns FALSE, the thread handler returns, and the thread exits. The variable m_bRun is used to prevent the worker thread from exiting when there are always jobs in the queue. Even if DequeueJob returns TRUE, if m_bRun has been set to FALSE, the loop will exit.The
CDPWorkerThreadHandler includes a Stop function so that the CDPQueue1 class can force the worker thread to exit. It simply signals the internal event and sets the m_bRun member to FALSE. Notice that this is an asynchronous Stop function. The client will have to wait on the thread kernel object to ensure that it has exited.The rest of the implementation of
CDPQueue1 is straightforward. The object contains an internal CJobList member, m_cJobList, which is used to post jobs to the worker thread. A pointer to this job list is passed to the constructor for m_dpHandler, the CDPWorkerThreadHandler thread handler member variable, so that the worker thread can retrieve jobs from the queue. Finally, an auto-pointer, m_apWorkerThread, will be used to manage the worker thread object.The CDPQueue1::Start function simply checks if a worker thread exists and, if it doesn’t, creates one. The CDPQueue1::Stop function is almost the opposite, checking if a worker thread exists and stopping it if it does. Since the CDPWorkerThreadHandler::Stop function is asynchronous, the CDPQueue1::Stop function waits on the thread object until the thread has exited.
The QueueJob function simply passes the
CJob pointer to the QueueJob function of the CJobList member object. The CancelJob function is similar. The CancelAllJobs function is a little more complex. It has to dequeue jobs off the CJobList until there are no more, calling the Cancel virtual function on each job. Since the CJobList provides internal synchronization, there is no problem dequeuing jobs while the worker thread is running. However, it will usually make more sense to call the Stop function before invoking CancelAllJobs.The rest of the file implements a simple test program for the
CDPQueue1 class. The CPrintJob class is derived from CJob, and the Execute and Cancel methods are implemented. In this simple example, the CPrintJob objects are to manage their own lifetimes; they delete themselves after they have been processed, either executed or canceled. Since this is a test program, the job prints a message describing how it was handled.The main function simply creates a
CDPQueue object dpq and calls dpq.Start(). Then one hundred jobs are dynamically allocated and posted to the DPQ at irregular intervals. After a hundred jobs have been posted, the DPQ is stopped and all pending jobs are canceled, which prevents any memory leaks due to created jobs which were posted but not executed. Finally, the program returns.Some sample output from DPQ1.exe is shown in Example 9-3.
Example 9-3. Sample output from DPQ1.exe
DPQ1 thread created.
Executing job <This is job 0>.
Executing job <This is job 1>.
Executing job <This is job 2>.
Executing job <This is job 3>.
Executing job <This is job 4>.
Executing job <This is job 5>.
Executing job <This is job 6>.
Executing job <This is job 7>.
Executing job <This is job 8>.
Executing job <This is job 9>.
Executing job <This is job 10>.
Executing job <This is job 11>.
Executing job <This is job 12>.
Executing job <This is job 13>.
Executing job <This is job 14>.
Executing job <This is job 15>.
Executing job <This is job 16>.
Executing job <This is job 17>.
Executing job <This is job 18>.
Executing job <This is job 19>.
Executing job <This is job 20>.
Executing job <This is job 21>.
Executing job <This is job 22>.
Executing job <This is job 23>.
Executing job <This is job 24>.
Executing job <This is job 25>.
Executing job <This is job 26>.
Executing job <This is job 27>.
Executing job <This is job 28>.
Executing job <This is job 29>.
Executing job <This is job 30>.
Executing job <This is job 31>.
Executing job <This is job 32>.
Executing job <This is job 33>.
Executing job <This is job 34>.
Executing job <This is job 35>.
Executing job <This is job 36>.
Executing job <This is job 37>.
Executing job <This is job 38>.
Executing job <This is job 39>.
Executing job <This is job 40>.
Executing job <This is job 41>.
Executing job <This is job 42>.
Executing job <This is job 43>.
Executing job <This is job 44>.
Executing job <This is job 45>.
Executing job <This is job 46>.
Executing job <This is job 47>.
Executing job <This is job 48>.
Executing job <This is job 49>.
Executing job <This is job 50>.
Executing job <This is job 51>.
Executing job <This is job 52>.
Executing job <This is job 53>.
Executing job <This is job 54>.
Executing job <This is job 55>.
Executing job <This is job 56>.
Executing job <This is job 57>.
Executing job <This is job 58>.
Executing job <This is job 59>.
Executing job <This is job 60>.
Executing job <This is job 61>.
Executing job <This is job 62>.
Executing job <This is job 63>.
Executing job <This is job 64>.
Executing job <This is job 65>.
Executing job <This is job 66>.
Executing job <This is job 67>.
Executing job <This is job 68>.
Executing job <This is job 69>.
Executing job <This is job 70>.
Executing job <This is job 71>.
Executing job <This is job 72>.
Executing job <This is job 73>.
Executing job <This is job 74>.
Executing job <This is job 75>.
Executing job <This is job 76>.
Executing job <This is job 77>.
Executing job <This is job 78>.
Executing job <This is job 79>.
Executing job <This is job 80>.
Executing job <This is job 81>.
Executing job <This is job 82>.
Executing job <This is job 83>.
Executing job <This is job 84>.
Executing job <This is job 85>.
Executing job <This is job 86>.
Executing job <This is job 87>.
Executing job <This is job 88>.
Executing job <This is job 89>.
Executing job <This is job 90>.
Executing job <This is job 91>.
Executing job <This is job 92>.
Executing job <This is job 93>.
Executing job <This is job 94>.
Executing job <This is job 95>.
Executing job <This is job 96>.
Executing job <This is job 97>.
DPQ1 thread exited.
Canceling job <This is job 98>.
Canceling job <This is job 99>.
The second implementation, shown in Example 9-4, will create the worker thread when a job first appears in the queue. After the queue has been empty for awhile, the worker thread will be destroyed. If later, another job appears in the queue, another worker thread will be created. This implementation is trickier because there are some subtle thread synchronization issues to solve.
Example 9-4. DPQ2.cpp, second example of a deferred processing queue active object
#include <stdio.h>
#include <string.h>
#include "CMcl.h"
class CJob {
// this class has only pure virtual functions
// and must be derived from to use with the
// CJobList object...
public:
// execute method will be called by the
// deferred processing queue to process
// this job...
virtual void Execute(void) = 0;
// cancel method will be called by the
// deferred processing queue to process
// this job...
virtual void Cancel(void) = 0;
};
class CJobList : private CMclLinkedList<CJob *> {
public:
BOOL QueueJob( CJob *pJob) {
return PutOnTailOfList(pJob);
};
BOOL DequeueJob( CJob **pJob, CMclEvent *pInterrupt = NULL, DWORD dwTimeout = INFINITE) {
return GetFromHeadOfList( *pJob, dwTimeout, pInterrupt);
};
BOOL RemoveJob( CJob *pJob) {
// decrease the semaphore count...
// if the wait fails, there are no jobs in the queue...
if (CMclWaitTimeout(m_csNotEmpty.Wait(0))) {
return FALSE;
}
// acquire the critical section lock...
CMclAutoLock autoLock(m_cCritSec);
// search the list for a matching job...
CMclLinkedListNode *pNode = m_MasterNode.m_pNext;
while (pNode != &m_MasterNode) {
// since pNode is not the master node, we know that
// it points to a data node, so this cast is safe...
if ((static_cast<CMclLinkedListDataNode *>(pNode))->m_data == pJob)
break;
}
// if we found a match, remove it...
if (pNode != &m_MasterNode) {
// remove the node...
pNode->m_pPrev->m_pNext = pNode->m_pNext;
pNode->m_pNext->m_pPrev = pNode->m_pPrev;
// add the list node to the free list...
AddToFreeList(pNode);
// return TRUE when we remove a node...
return TRUE;
}
else {
// return FALSE when there is no matching
// node to remove, we need to bump the semaphore
// count back up...
m_csNotEmpty.Release(1);
return FALSE;
}
};
};
class CDPQueue2 : public CMclThreadHandler {
private:
CJobList m_cJobList; // list of pending jobs
CMclThreadAutoPtr m_apWorkerThread; // auto-pointer to worker thread
CMclEvent m_ceControl; // auto-reset event to control thread
BOOLEAN m_bThreadExists; // flag is TRUE when worker thread exits
BOOLEAN m_bJobPosted; // flag is TRUE between a job being queued and processed
CMclCritSec m_CritSec; // synchronize access to m_bThreadExists and m_bJobPosted
DWORD m_dwTimeout; // timeout value passed in the constructor
private:
unsigned ThreadHandlerProc(void) {
CJob *pJob;
while (TRUE) {
if (m_cJobList.DequeueJob( &pJob, &m_ceControl, m_dwTimeout)) {
pJob->Execute();
CMclAutoLock lock(m_CritSec);
m_bJobPosted = FALSE;
}
else {
CMclAutoLock lock(m_CritSec);
if (!m_bJobPosted) {
m_bThreadExists = FALSE;
break;
}
}
}
printf("DPQ2 thread exiting.\n");
return 0;
};
public:
CDPQueue2( DWORD dwTimeout = 1000) {
m_bThreadExists = FALSE;
m_bJobPosted = FALSE;
m_dwTimeout = dwTimeout;
return;
};
~CDPQueue2() {
m_ceControl.Set();
m_apWorkerThread->Wait(INFINITE);
}
BOOL QueueJob( CJob *pJob) {
BOOL bStatus = m_cJobList.QueueJob(pJob);
// limit scope to hold auto-lock
{
CMclAutoLock lock(m_CritSec);
m_bJobPosted = TRUE;
if (!m_bThreadExists) {
// we need to create a worker thread...
m_bThreadExists = TRUE;
m_apWorkerThread = new CMclThread(this);
printf("DPQ2 thread created.\n");
}
}
return bStatus;
};
BOOL CancelJob( CJob *pJob) {
if (m_cJobList.RemoveJob(pJob) == TRUE) {
pJob->Cancel();
return TRUE;
}
else {
return FALSE;
}
};
void CancelAllJobs(void) {
CJob *pJob;
while (m_cJobList.DequeueJob( &pJob, NULL, 0)) {
pJob->Cancel();
}
};
};
class CPrintJob : public CJob {
private:
TCHAR *m_lpString;
BOOL m_bInUse;
public:
// constructor...
CPrintJob(LPCTSTR lpString) {
// initialize the print job...
m_lpString = NULL;
if (lpString) {
m_lpString = new TCHAR[strlen(lpString) + 1];
strcpy( m_lpString, lpString);
}
};
// pure virtual destructor...
virtual ~CPrintJob() {
if (m_lpString) {
delete [] m_lpString;
}
};
void Execute(void) {
// execute the print job...
if (m_lpString) {
printf( "Executing job <%s>.\n", m_lpString);
}
// make the job take a small amount of time...
Sleep(50);
// jobs delete themselves when complete...
delete this;
};
void Cancel(void) {
// cancel the print job...
if (m_lpString) {
printf( "Canceling job <%s>.\n", m_lpString);
}
// jobs delete themselves when canceled...
delete this;
};
};
int main( int argc, char *argv[]) {
CDPQueue2 dpq(10);
CPrintJob *pPrintJob;
TCHAR string[32];
// this DPQ starts automatically...
// post 100 jobs at irregular intervals
// between 1 and 100 milliseconds...
for (int index = 0; index < 100; index++) {
Sleep((rand() % 99) + 1);
sprintf( string, "This is job %d", index);
pPrintJob = new CPrintJob(string);
dpq.QueueJob(pPrintJob);
}
// this DPQ stops automatically...
// cancel any jobs remaining...
dpq.CancelAllJobs();
return 0;
}
The program is largely the same as the previous version. The definition of the
CJob class is identical, as is the CJobList implementation and the CPrintJob class. The main function is similar, the only difference being that this type of DPQ object does not have to be stopped and started, and the CDPQueue2 constructor takes a single argument which sets the number of milliseconds for the internal worker thread to wait for a job to appear in the queue before it exits.All of the interesting code is inside the implementation of the
CDPQueue2 class. The class has been derived from CMclThreadHandler to allow the class to contain its own ThreadHandlerProc and provide an easy way for the internal worker thread to share the member variables with the CDPQueue2 object. From an object oriented design point of view, an instance of CDPQueue2 is both the DPQ object and the worker thread CMclThreadHandler object.The
CDPQueue2 contains several private member variables. The member m_cJobList is a CJobList object which will hold pending jobs. m_apWorkerThread is an auto-pointer object used to manage the worker thread object. The CMclEvent object, m_ceControl, is used by the CDPQueue2 object to signal the worker thread to terminate when it is destroyed. The member m_dwTimeout holds the time-out value passed in the constructor.Before we can discuss the first member function ThreadHandlerProc, we need to examine the problem of creating and exiting the worker thread at appropriate times. This class is to have an active worker thread process queued jobs for as long as there are jobs in the queue, or until the time-out value passed in the constructor expires. When a job is posted into the queue, if there is no active thread or a currently active thread is terminating, the class must create a worker thread to process the job.
In order to have the worker thread created and destroyed dynamically, when we post a job to the DPQ using the QueueJob member function, we need to check if a thread exists to process the job. If there is no thread, we need to create one. The worker thread runs merrily along taking jobs off the queue and executing them. The worker thread will wait a certain period of time for a job to appear in the queue, and exit if no job appears within the allotted time. Since the
CJobList class provides a time-out parameter in the DequeueJob member function, this is simple.The difficulty is in knowing when you need to create a new worker thread. Detecting when there is no thread available to process a job is trickier than it may first appear. You cannot simply query the thread’s exit code with the GetExitCodeThread function and check for
STILL_ACTIVE. The worker thread may have exited the queue processing loop but not have returned from its thread procedure. In this case, the operating system will report that the thread is STILL_ACTIVE, but the job just queued will not be processed because the thread has stopped pulling jobs off the queue.The solution used here requires the DPQ object to maintain two internal state variables that are protected by a critical section. One state variable,
m_bThreadExists, is set to TRUE immediately before the worker thread is created, and is set to FALSE just before breaking out of the main job processing loop. The second state variable, m_bJobPosted, is set to TRUE immediately after a job is posted to the queue, and is set to FALSE as each job is retrieved from the queue by the worker thread. The purpose of m_bJobPosted is to allow the worker thread to check if a job has been posted in between the time the thread has timed out in trying to retrieve a job off the queue and the time the worker thread decides whether or not to exit. Both m_bThreadExists and m_bJobPosted are checked and set inside a block of code which acquires the critical section object m_CritSec. Since the state variables are only accessed when the critical section has been acquired, the two states are always correct inside the protected blocks of code and can be used by the QueueJob function to decide if a new worker thread needs to be created and by the worker thread when its DequeueJob function returns FALSE to determine if it is really all right for the thread to exit.The ThreadHandlerProc runs in a loop as long as the call to DequeueJob returns
TRUE, which it will if a job has been successfully retrieved. If so, the job is processed and m_bJobPosted is set to FALSE to indicate that a job has not been posted since the last job was executed. This is done inside a code block protected by the critical section to synchronize with any other thread calling the QueueJob function, which we will discuss shortly.When DequeueJob returns
FALSE, assuming the control event has not been signaled, it means that the time-out period has expired, and the worker thread may want to exit. In this case, the thread acquires the critical section and checks m_bJobPosted. If and only if this is FALSE will the worker thread exit, first setting m_bThreadExists to FALSE so that any thread trying to queue a job will know that it also needs to create a worker thread.The QueueJob function complements the ThreadHandlerProc by setting
m_bThreadExists inside a block of code protected by the same critical section. The member m_bThreadExists can only be FALSE if the worker thread has exited or is in the process of exiting. Furthermore, the member m_bJobPosted is only set to TRUE inside the same block of code, so that the worker thread cannot be in the process of exiting when another thread is in this block without the m_bThreadExists member being set to FALSE. Thus QueueJob will always create a worker thread when one is necessary to process the job just posted.The rest of the implementation of
CDPQueue2 is unchanged from the first version except for the destructor. The worker thread must exit before the CDPQueue2 object is destroyed to avoid the running ThreadHandlerProc causing memory access violations by reading and writing to freed memory. This is done by signaling the worker thread with the m_ceControl event object and waiting until the thread object is signaled, which indicates that it has exited. Notice that if the last worker thread had previously exited, the thread object will already be signaled and the wait will return immediately.Example 9-5 shows some sample output from DPQ2.exe.
Example 9-5. Sample output from DPQ2.exe
DPQ2 thread created.
Executing job <This is job 0>.
Executing job <This is job 1>.
DPQ2 thread exiting.
DPQ2 thread created.
Executing job <This is job 2>.
Executing job <This is job 3>.
Executing job <This is job 4>.
DPQ2 thread exiting.
DPQ2 thread created.
Executing job <This is job 5>.
DPQ2 thread exiting.
DPQ2 thread created.
Executing job <This is job 6>.
Executing job <This is job 7>.
Executing job <This is job 8>.
Executing job <This is job 9>.
Executing job <This is job 10>.
Executing job <This is job 11>.
Executing job <This is job 12>.
Executing job <This is job 13>.
Executing job <This is job 14>.
DPQ2 thread exiting.
DPQ2 thread created.
Executing job <This is job 15>.
Executing job <This is job 16>.
Executing job <This is job 17>.
Executing job <This is job 18>.
DPQ2 thread exiting.
DPQ2 thread created.
Executing job <This is job 19>.
Executing job <This is job 20>.
Executing job <This is job 21>.
Executing job <This is job 22>.
Executing job <This is job 23>.
DPQ2 thread exiting.
DPQ2 thread created.
Executing job <This is job 24>.
Executing job <This is job 25>.
Executing job <This is job 26>.
Executing job <This is job 27>.
Executing job <This is job 28>.
Executing job <This is job 29>.
Executing job <This is job 30>.
Executing job <This is job 31>.
Executing job <This is job 32>.
Executing job <This is job 33>.
Executing job <This is job 34>.
Executing job <This is job 35>.
Executing job <This is job 36>.
Executing job <This is job 37>.
Executing job <This is job 38>.
Executing job <This is job 39>.
Executing job <This is job 40>.
Executing job <This is job 41>.
Executing job <This is job 42>.
Executing job <This is job 43>.
Executing job <This is job 44>.
Executing job <This is job 45>.
Executing job <This is job 46>.
Executing job <This is job 47>.
Executing job <This is job 48>.
Executing job <This is job 49>.
Executing job <This is job 50>.
Executing job <This is job 51>.
Executing job <This is job 52>.
Executing job <This is job 53>.
Executing job <This is job 54>.
Executing job <This is job 55>.
Executing job <This is job 56>.
Executing job <This is job 57>.
Executing job <This is job 58>.
Executing job <This is job 59>.
Executing job <This is job 60>.
Executing job <This is job 61>.
Executing job <This is job 62>.
Executing job <This is job 63>.
Executing job <This is job 64>.
Executing job <This is job 65>.
Executing job <This is job 66>.
Executing job <This is job 67>.
Executing job <This is job 68>.
Executing job <This is job 69>.
Executing job <This is job 70>.
Executing job <This is job 71>.
Executing job <This is job 72>.
Executing job <This is job 73>.
Executing job <This is job 74>.
Executing job <This is job 75>.
Executing job <This is job 76>.
Executing job <This is job 77>.
Executing job <This is job 78>.
Executing job <This is job 79>.
Executing job <This is job 80>.
Executing job <This is job 81>.
Executing job <This is job 82>.
Executing job <This is job 83>.
Executing job <This is job 84>.
Executing job <This is job 85>.
Executing job <This is job 86>.
Executing job <This is job 87>.
Executing job <This is job 88>.
Executing job <This is job 89>.
Executing job <This is job 90>.
Executing job <This is job 91>.
Executing job <This is job 92>.
Executing job <This is job 93>.
Executing job <This is job 94>.
Executing job <This is job 95>.
Executing job <This is job 96>.
Executing job <This is job 97>.
Canceling job <This is job 98>.
Canceling job <This is job 99>.
DPQ2 thread exiting.
Which type of DPQ, or, more generally, which type of active object, is better? That depends upon the task at hand and the overall system design. The first type is simpler, and only creates a single thread for as long as it runs. It does require the client to control stopping and starting, which may be inconvenient at times. Also, for active objects which usually have nothing to do, the thread object spends most of its time taking up system resources and not doing any work. This could be a problem if your software has many active objects running at once.
The second type of DPQ (besides serving as an example of how to solve a difficult thread synchronization problem) is useful when the system has many active objects, each of which get work in bursts with extended periods of time without any jobs to process. The second type is also easier for the client software to manage, since it pretty much manages itself. You will have to examine the run-time characteristics of the software to determine what types of active objects have better performance in your own system.