$include_dir="/home/hyper-archives/boost-users/include"; include("$include_dir/msg-header.inc") ?>
From: Anteru (newsgroups_at_[hidden])
Date: 2008-04-08 12:43:18
Anthony Williams schrieb:
> It is not possible to detect a spurious wake: if it was, then the
> implementation would resume waiting. However, spurious wakes should be rare
> occurrences.
All right, maybe this could be added to the documentation somewhere.
> Can you show me some code?
Worker threads are supposed to die as soon as a null-job is encountered. 
The worker threads call executeJob. It works with queueFull_.notifyAll 
() in insertJob, but it fails with notifyOne -- note, this simply pass 
through to the Boost::condition counterparts.
It used to work with a home-grown condition implementation, moreover, I 
had a prototype in Python and this one worked too. The problem is that 
the during the destructor, not all threads die properly (usually one to 
two remain, if started with 4 threads on a dual-core) -- I can even 
reproduce it nearly always. Adding some debug output usually makes it 
work, so it does look like a race condition. I only observed a deadlock 
if the following happened:
insertJob called
insertJob called
...
then, one thread remained waiting forever. However, if only one Job gets 
inserted at a time it works. Tested on a Dual-Core, so it was really 
concurrent. Note: It might be very well that it also deadlocks in more 
cases, it's just *damn* difficult to observe it properly.
Only problem I had at the beginning was in the part 				queueFull_.wait 
(lock);
if (jobQueue_.empty ())
{
        continue;
}
as one thread might have been waiting for the mutex_ and fast-tracks 
through the executeJob loop without waiting and then it gets its job, 
executes it while notifyOne has been called during insertJob which wakes 
up a waiting thread /after/ the other thread removed the item, so the 
thread has to re-check that the queue has still an item. Without this, a 
thread might pop an empty queue.
class ThreadPool
{
private:
        std::queue <IJob::Ptr> jobQueue_;
        std::vector <WorkerThread::Ptr>	workers_;
        Condition queueEmpty_, queueFull_;
        Mutex	mutex_;
        uint	threadCount_;		
};
//////////////////////////////////////////////////////////////////////////
void ThreadPool::stopAllThreads ()
{
        for (int i = 0; i < threadCount_; ++i)
        {
                insertJob (IJob::Ptr ());
                waitFinished ();
        }
}
        //////////////////////////////////////////////////////////////////////////
void ThreadPool::insertJob (ThreadPool::IJob::Ptr job)
{
        Lock lock (mutex_);
        jobQueue_.push (job);
        queueFull_.notifyAll ();
// notifyOne leads to a deadlock, notify all
}
//////////////////////////////////////////////////////////////////////////
void ThreadPool::executeJob ()
{
        while (true)
        {
                IJob::Ptr job;
                {
                        Lock lock (mutex_);
                        if (jobQueue_.empty ())
                        {
                                queueFull_.wait (lock);
// Deadlocks here, with jobQueue_.empty () == false !
                                if (jobQueue_.empty ())
                                {
                                        continue;
                                }
                        }
                        job = jobQueue_.front ();
                        jobQueue_.pop ();
                }
                if (job.get () == 0)
                {
                        jobFinished ();
                        return;
                }
                else
                {
                        job->run ();
                        jobFinished ();
                }
        }
}
//////////////////////////////////////////////////////////////////////////
ThreadPool::ThreadPool (const uint threadCount)
        : threadCount_ (threadCount)
{
        for (uint i = 0; i < threadCount; ++i)
        {
                WorkerThread::Ptr w (new WorkerThread (this));
                workers_.push_back (w);
                w->run ();
        }
}
        //////////////////////////////////////////////////////////////////////////
ThreadPool::~ThreadPool ()
{
        waitFinished ();
// Get through, no work waiting
        stopAllThreads ();
        joinAllThreads ();
}
        //////////////////////////////////////////////////////////////////////////
void ThreadPool::joinAllThreads ()
{
        BOOST_FOREACH(Thread::Ptr t, workers_)
        {
                t->join ();
        }
}
        //////////////////////////////////////////////////////////////////////////
void ThreadPool::jobFinished ()
{
        Lock lock (mutex_);
        if (jobQueue_.empty ())
        {
                queueEmpty_.notifyAll ();
        }
}
        //////////////////////////////////////////////////////////////////////////
void ThreadPool::waitFinished ()
{
        Lock lock (mutex_);
        if (jobQueue_.empty ())
        {
                return;
        }
        else
        {
                queueEmpty_.wait (lock);
        }
}
Cheers,
   Anteru