$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r52298 - in sandbox/threadpool: boost/tp boost/tp/detail libs/tp/build libs/tp/src
From: oliver.kowalke_at_[hidden]
Date: 2009-04-09 12:04:40
Author: olli
Date: 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
New Revision: 52298
URL: http://svn.boost.org/trac/boost/changeset/52298
Log:
* restructuring of pool, worker
* introduction of worker_group
Added:
   sandbox/threadpool/boost/tp/detail/worker_group.hpp   (contents, props changed)
   sandbox/threadpool/libs/tp/src/worker_group.cpp   (contents, props changed)
Text files modified: 
   sandbox/threadpool/boost/tp/detail/worker.hpp |   297 +++++++++++++++++++++++++++--------     
   sandbox/threadpool/boost/tp/pool.hpp          |   330 +++++++++------------------------------ 
   sandbox/threadpool/libs/tp/build/Jamfile.v2   |     2                                         
   sandbox/threadpool/libs/tp/src/worker.cpp     |   157 +-----------------                      
   4 files changed, 320 insertions(+), 466 deletions(-)
Modified: sandbox/threadpool/boost/tp/detail/worker.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/worker.hpp	(original)
+++ sandbox/threadpool/boost/tp/detail/worker.hpp	2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -11,13 +11,17 @@
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
 #include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#include <boost/random.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
 #include <boost/utility.hpp>
 
 #include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/guard.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 #include <boost/tp/detail/wsq.hpp>
+#include <boost/tp/poolsize.hpp>
+#include <boost/tp/scanns.hpp>
 
 namespace boost {
 namespace tp {
@@ -28,71 +32,237 @@
 private:
         static thread_specific_ptr< worker >	tss_;
 
-	class impl : private noncopyable
+	struct impl
         {
-	private:
-		typedef std::pair< callable, interrupter >	item;
-		shared_ptr< thread >						thrd_;
-		wsq< item > 								wsq_;
-		interprocess::interprocess_semaphore		shtdwn_sem_;
-		interprocess::interprocess_semaphore		shtdwn_now_sem_;
-		bool										shtdwn_;
-		std::size_t									scns_;
-
-	public:
-		impl( function< void() > const&);
-
                 virtual ~impl() {}
 
-		const shared_ptr< thread > thrd() const;
-
-		const thread::id get_id() const;
+		virtual const thread::id get_id() const = 0;
 
-		void join() const;
+		virtual void join() const = 0;
 
-		void interrupt() const;
+		virtual void interrupt() const = 0;
 
-		void put( callable const&, interrupter const&);
+		virtual void put( callable const&, interrupter const&) = 0;
 
-		bool try_take( callable &, interrupter &);
+		virtual bool try_take( callable &, interrupter &) = 0;
 
-		bool try_steal( callable &, interrupter &);
+		virtual bool try_steal( callable &, interrupter &) = 0;
 
-		bool empty();
+		virtual void signal_shutdown() = 0;
 
-		void signal_shutdown();
+		virtual void signal_shutdown_now() = 0;
 
-		void signal_shutdown_now();
+		virtual void schedule_until( function< bool() > const&) = 0;
 
-		bool shutdown();
-
-		bool shutdown_now();
-
-		std::size_t scanns() const;
-
-		void increment_scanns();
-
-		void reset_scanns();
-
-		virtual void reschedule_until( function< bool() > const&) = 0;
+		virtual void run() = 0;
         };
 
         template< typename Pool >
-	class impl_pool : public impl
+	class impl_pool : public impl,
+					  private noncopyable
         {
         private:
-		Pool	&	p_;
+		class random_idx
+		{
+		private:
+			rand48 rng_;
+			uniform_int<> six_;
+			variate_generator< rand48 &, uniform_int<> > die_;
+	
+		public:
+			random_idx( std::size_t size)
+			:
+			rng_(),
+			six_( 0, size - 1),
+			die_( rng_, six_)
+			{}
+	
+			std::size_t operator()()
+			{ return die_(); }
+		};
 
-	public:
-		impl_pool( Pool & p, function< void() > const& fn)
-		: impl( fn), p_( p)
-		{}
+		typedef std::pair< callable, interrupter >	item;
 
-		void reschedule_until( function< bool() > const& pred)
-		{ p_.reschedule_until_( pred); }
+		Pool									&	pool_;
+		shared_ptr< thread >						thrd_;
+		wsq< item > 								wsq_;
+		interprocess::interprocess_semaphore		shtdwn_sem_;
+		interprocess::interprocess_semaphore		shtdwn_now_sem_;
+		bool										shtdwn_;
+		posix_time::time_duration					asleep_;
+		scanns										max_scns_;
+		std::size_t									scns_;
+		random_idx									rnd_idx_;
+
+		void execute_(
+			callable & ca,
+			interrupter & intr)
+		{
+			BOOST_ASSERT( ! ca.empty() );
+			guard grd( get_pool().active_worker_);
+			shared_ptr< void > ig(
+				static_cast< void * >( 0),
+				boost::bind(
+					& interrupter::reset,
+					intr) );
+			intr.set( thrd_);
+			ca();
+			ca.clear();
+			BOOST_ASSERT( ca.empty() );
+		}
+	
+		void next_callable_( callable & ca, interrupter & intr)
+		{
+			if ( ! try_take( ca, intr) )
+			{
+				if ( ! get_pool().channel_.try_take( ca, intr) )
+				{
+					std::size_t idx( rnd_idx_() );
+					for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
+					{
+						worker other( get_pool().wg_[idx]);
+						if ( this_thread::get_id() == other.get_id() ) continue;
+						if ( ++idx >= get_pool().wg_.size() ) idx = 0;
+						if ( other.try_steal( ca, intr) ) break;
+					}
+	
+					if ( ca.empty() )
+					{
+						guard grd( get_pool().idle_worker_);
+						if ( shutdown_() ) return;
+						++scns_;
+						if ( scns_ >= max_scns_)
+						{
+							if ( get_pool().size_() == get_pool().idle_worker_)
+								get_pool().channel_.take( ca, intr, asleep_);
+							else
+								this_thread::sleep( asleep_);
+							scns_ = 0;
+						}
+						else
+							this_thread::yield();
+					}
+				}
+			}
+		}
+
+		bool shutdown_()
+		{
+			if ( shutdown__() && get_pool().channel_.empty() )
+				return true;
+			else if ( shutdown_now__() )
+				return true;
+			return false;
+		}
+
+		bool shutdown__()
+		{
+			if ( ! shtdwn_)
+				shtdwn_ = shtdwn_sem_.try_wait();
+			return shtdwn_;
+		}
+		
+		bool shutdown_now__()
+		{ return shtdwn_now_sem_.try_wait(); }
+
+	public:
+		impl_pool(
+			Pool & pool,
+			poolsize const& psize,
+			posix_time::time_duration const& asleep,
+			scanns const& max_scns,
+			function< void() > const& fn)
+		:
+		pool_( pool),
+		thrd_( new thread( fn) ),
+		wsq_(),
+		shtdwn_sem_( 0),
+		shtdwn_now_sem_( 0),
+		shtdwn_( false),
+		asleep_( asleep),
+		max_scns_( max_scns),
+		scns_( 0),
+		rnd_idx_( psize)
+		{ BOOST_ASSERT( ! fn.empty() ); }
+
+		const thread::id get_id() const
+		{ return thrd_->get_id(); }
+
+		void join() const
+		{ thrd_->join(); }
+
+		void
+		interrupt() const
+		{ thrd_->interrupt(); }
+
+		void signal_shutdown()
+		{ shtdwn_sem_.post(); }
+		
+		void signal_shutdown_now()
+		{ shtdwn_now_sem_.post(); }
+
+		void put(
+			callable const& ca,
+			interrupter const& intr)
+		{
+			BOOST_ASSERT( ! ca.empty() );
+			wsq_.put( std::make_pair( ca, intr) );
+		}
+
+		bool try_take(
+			callable & ca,
+			interrupter & intr)
+		{
+			item itm;
+			bool result( wsq_.try_take( itm) );
+			if ( result)
+			{
+				ca = itm.first;
+				intr = itm.second;
+			}
+			return result;
+		}
+		
+		bool try_steal(
+			callable & ca,
+			interrupter & intr)
+		{
+			item itm;
+			bool result( wsq_.try_steal( itm) );
+			if ( result)
+			{
+				ca = itm.first;
+				intr = itm.second;
+			}
+			return result;
+		}
 
                 Pool & get_pool() const
-		{ return p_; }
+		{ return pool_; }
+
+		void run()
+		{
+			BOOST_ASSERT( get_id() == this_thread::get_id() );
+
+			schedule_until(
+				bind( & impl_pool::shutdown_, this) );
+		}
+	
+		void schedule_until( function< bool() > const& pred)
+		{
+			callable ca;
+			interrupter intr;
+	
+			while ( ! pred() )
+			{
+				next_callable_( ca, intr);
+				if( ! ca.empty() )
+				{
+					execute_( ca, intr);
+					scns_ = 0;
+				}
+			}
+		}
         };
 
         shared_ptr< impl >	impl_;
@@ -101,40 +271,31 @@
         template< typename Pool >
         worker(
                 Pool & pool,
+		poolsize const& psize,
+		posix_time::time_duration const& asleep,
+		scanns const& max_scns,
                 function< void() > const& fn)
-	: impl_( new impl_pool< Pool >( pool, fn) )
+	:
+	impl_(
+		new impl_pool< Pool >(
+			pool,
+			psize,
+			asleep,
+			max_scns,
+			fn) )
         {}
 
-	const shared_ptr< thread > thrd() const;
-
         const thread::id get_id() const;
 
         void join() const;
-
         void interrupt() const;
+	void signal_shutdown();
+	void signal_shutdown_now();
 
         void put( callable const&, interrupter const&);
-
         bool try_take( callable &, interrupter &);
-
         bool try_steal( callable &, interrupter &);
 
-	bool empty() const;
-
-	void signal_shutdown();
-
-	void signal_shutdown_now();
-
-	bool shutdown();
-
-	bool shutdown_now();
-
-	std::size_t scanns() const;
-
-	void increment_scanns();
-
-	void reset_scanns();
-
         void reschedule_until( function< bool() > const&);
 
         template< typename Pool >
@@ -145,9 +306,9 @@
                 return p->get_pool();
         }
 
-	static worker * tss_get();
+	void run();
 
-	static void tss_reset( worker * w);
+	static worker * tss_get();
 };
 
 } } }
Added: sandbox/threadpool/boost/tp/detail/worker_group.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker_group.hpp	2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -0,0 +1,82 @@
+//  Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+//  Software License, Version 1.0. (See accompanying file
+//  LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_GROUP_H
+#define BOOST_TP_DETAIL_WORKER_GROUP_H
+
+#include <cstddef>
+
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/random_access_index.hpp>
+#include <boost/thread.hpp>
+
+#include <boost/tp/detail/worker.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+class worker_group
+{
+private:
+	struct id_idx_tag {};
+	struct rnd_idx_tag {};
+
+	typedef multi_index::multi_index_container<
+		worker,
+		multi_index::indexed_by<
+			multi_index::ordered_unique<
+				multi_index::tag< id_idx_tag >,
+				multi_index::const_mem_fun<
+					worker,
+					const thread::id,
+					& worker::get_id
+				>
+			>,
+			multi_index::random_access< multi_index::tag< rnd_idx_tag > >
+		>
+	>				container;
+
+	typedef container::index< id_idx_tag >::type		id_idx;
+	typedef container::index< rnd_idx_tag >::type		rnd_idx;
+
+	container		cont_;
+	id_idx		&	id_idx_;
+	rnd_idx		&	rnd_idx_;
+
+public:
+	typedef id_idx::iterator		iterator;
+	typedef id_idx::const_iterator	const_iterator;
+
+	worker_group();
+
+	const worker operator[]( std::size_t pos) const;
+
+	std::size_t size() const;
+
+	const iterator begin();
+
+	const const_iterator begin() const;
+
+	const iterator end();
+
+	const const_iterator end() const;
+
+	const const_iterator find( thread::id const& id) const;
+
+	void join_all();
+
+	void interrupt_all();
+
+	void insert( worker const& w);
+
+	void signal_shutdown_all();
+
+	void signal_shutdown_now_all();
+};
+} } }
+
+#endif // BOOST_TP_DETAIL_WORKER_GROUP_H
+
Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp	(original)
+++ sandbox/threadpool/boost/tp/pool.hpp	2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -16,12 +16,7 @@
 #include <boost/bind.hpp>
 #include <boost/foreach.hpp>
 #include <boost/function.hpp>
-#include <boost/multi_index_container.hpp>
-#include <boost/multi_index/mem_fun.hpp>
-#include <boost/multi_index/ordered_index.hpp>
-#include <boost/multi_index/random_access_index.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/random.hpp>
 #include <boost/ref.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
@@ -30,12 +25,12 @@
 
 #include <boost/tp/detail/atomic.hpp>
 #include <boost/tp/detail/callable.hpp>
-#include <boost/tp/detail/guard.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
 #include <boost/tp/detail/worker.hpp>
+#include <boost/tp/detail/worker_group.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/poolsize.hpp>
 #include <boost/tp/scanns.hpp>
@@ -59,199 +54,62 @@
         typedef Channel						channel;
         typedef typename channel::item		channel_item;
 
-	enum state
-	{
-		active_state,
-		terminateing_state,
-		terminated_state
-	};
-
-	struct id_idx_tag {};
-	struct rnd_idx_tag {};
-
-	typedef multi_index::multi_index_container<
-		detail::worker,
-		multi_index::indexed_by<
-			multi_index::ordered_unique<
-				multi_index::tag< id_idx_tag >,
-				multi_index::const_mem_fun<
-					detail::worker,
-					const thread::id,
-					& detail::worker::get_id
-				>
-			>,
-			multi_index::random_access< multi_index::tag< rnd_idx_tag > >
-		>
-	>										worker_list;
-
-	typedef typename worker_list::template index<
-		id_idx_tag >::type					id_idx;
-	typedef typename worker_list::template index<
-		rnd_idx_tag >::type					rnd_idx;
-
-	class random_idx
-	{
-	private:
-		rand48 rng_;
-		uniform_int<> six_;
-		variate_generator< rand48 &, uniform_int<> > die_;
-
-	public:
-		random_idx( std::size_t size)
-		:
-		rng_(),
-		six_( 0, size - 1),
-		die_( rng_, six_)
-		{}
-
-		std::size_t operator()()
-		{ return die_(); }
-	};
-
-	static thread_specific_ptr< random_idx >	tss_rnd_idx_;
 
-	worker_list									worker_;
-	shared_mutex								mtx_worker_;
+	detail::worker_group						wg_;
+	shared_mutex								mtx_wg_;
         unsigned int								state_;
         channel		 								channel_;
-	posix_time::time_duration					asleep_;
-	scanns										scns_;
         unsigned int								active_worker_;
         unsigned int								idle_worker_;
-	unsigned int								running_worker_;
-
-	void execute_(
-		detail::callable & ca,
-		detail::interrupter & intr,
-		shared_ptr< thread > const& thrd)
-	{
-		BOOST_ASSERT( ! ca.empty() );
-		detail::guard grd( active_worker_);
-		shared_ptr< void > ig(
-			static_cast< void * >( 0),
-			boost::bind(
-				& detail::interrupter::reset,
-				intr) );
-		intr.set( thrd);
-		ca();
-		ca.clear();
-		BOOST_ASSERT( ca.empty() );
-	}
-
-	void next_callable_( detail::worker & w, detail::callable & ca, detail::interrupter & intr)
-	{
-		rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
-		if ( ! w.try_take( ca, intr) )
-		{
-			if ( ! channel_.try_take( ca, intr) )
-			{
-				std::size_t idx( ( * tss_rnd_idx_)() );
-				for ( std::size_t j( 0); j < worker_.size(); ++j)
-				{
-					detail::worker other( ridx[idx]);
-					if ( this_thread::get_id() == other.get_id() ) continue;
-					if ( ++idx >= worker_.size() ) idx = 0;
-					if ( other.try_steal( ca, intr) ) break;
-				}
-
-				if ( ca.empty() )
-				{
-					detail::guard grd( idle_worker_);
-					if ( shutdown_( w) ) return;
-					w.increment_scanns();
-					if ( w.scanns() >= scns_)
-					{
-						if ( size_() == idle_worker_)
-							channel_.take( ca, intr, asleep_);
-						else
-							this_thread::sleep( asleep_);
-						w.reset_scanns();
-					}
-					else
-						this_thread::yield();
-				}
-			}
-		}
-	}
 
-	void reschedule_until_( function< bool() > const& pred)
+	void worker_entry_()
         {
-		detail::worker * w( detail::worker::tss_get() );
-		BOOST_ASSERT( w);
-		BOOST_ASSERT( w->get_id() == this_thread::get_id() );
-		shared_ptr< thread > thrd( w->thrd() );
-		BOOST_ASSERT( thrd);
-		detail::interrupter intr;
-		detail::callable ca;
-		while ( ! pred() )
-		{
-			next_callable_( * w, ca, intr);
-			if( ! ca.empty() )
-			{
-				execute_( ca, intr, thrd);
-				w->reset_scanns();
-			}
-		}
-	}
-
-	void entry_()
-	{
-		shared_lock< shared_mutex > lk( mtx_worker_);
-		id_idx & iidx( worker_.get< id_idx_tag >() );
-		typename id_idx::iterator i( iidx.find( this_thread::get_id() ) );
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		typename detail::worker_group::iterator i( wg_.find( this_thread::get_id() ) );
                 lk.unlock();
-		BOOST_ASSERT( i != iidx.end() );
-		detail::worker::tss_reset( new detail::worker( * i) );
-		
-		detail::worker * w( detail::worker::tss_get() );
-		BOOST_ASSERT( w);	
-		BOOST_ASSERT( w->get_id() == this_thread::get_id() );
-		shared_ptr< thread > thrd( w->thrd() );
-		BOOST_ASSERT( thrd);
-		detail::callable ca;
-		detail::interrupter intr;
-
-		tss_rnd_idx_.reset( new random_idx( worker_.size() ) );
+		BOOST_ASSERT( i != wg_.end() );
 
-		detail::guard grd( running_worker_);
-
-		while ( ! shutdown_( * w) )
-		{
-			next_callable_( * w, ca, intr);
-			if( ! ca.empty() )
-			{
-				execute_( ca, intr, thrd);
-				w->reset_scanns();
-			}
-		}
+		detail::worker w( * i);
+		w.run();
         }
-
-	void create_worker_()
+	
+	void create_worker_(
+		poolsize const& psize,
+		posix_time::time_duration const& asleep,
+		scanns const& max_scns)
         {
-		BOOST_ASSERT( ! terminateing_() && ! terminated_() );
-		worker_.insert(
+		wg_.insert(
                         detail::worker(
                                 * this,
+				psize,
+				asleep,
+				max_scns,
                                 boost::bind(
-					& pool::entry_,
+					& pool::worker_entry_,
                                         this) ) );
         }
 
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
-	void entry_( std::size_t n)
+	void worker_entry_( std::size_t n)
         {
                 this_thread::bind_to_processor( n);
-		entry_();
+		worker_entry_();
         }
 
-	void create_worker_( std::size_t n)
+	void create_worker_(
+		poolsize const& psize,
+		posix_time::time_duration const& asleep,
+		scanns const& max_scns,
+		std::size_t n)
         {
-		BOOST_ASSERT( ! terminateing_() && ! terminated_() );
-		worker_.insert(
+		wg_.insert(
                         detail::worker(
                                 * this,
+				psize,
+				asleep,
+				max_scns,
                                 boost::bind(
-					& pool::entry_,
+					& pool::worker_entry_,
                                         this,
                                         n) ) );
         }
@@ -264,22 +122,7 @@
         { return size_() - active_(); }
 
         std::size_t size_() const
-	{ return worker_.size(); }
-
-	bool terminated_() const
-	{ return state_ == terminated_state; }
-
-	bool terminateing_() const
-	{ return state_ == terminateing_state; }
-
-	bool shutdown_( detail::worker & w)
-	{
-		if ( w.shutdown() && channel_.empty() )
-			return true;
-		else if ( w.shutdown_now() )
-			return true;
-		return false;
-	}
+	{ return wg_.size(); }
 
         bool closed_() const
         { return state_ > 0; }
@@ -291,24 +134,21 @@
         explicit pool(
                 poolsize const& psize,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 10),
-		scanns const& scns = scanns( 20) )
+		scanns const& max_scns = scanns( 20) )
         :
-	worker_(),
-	mtx_worker_(),
+	wg_(),
+	mtx_wg_(),
         state_( 0),
         channel_(),
-	asleep_( asleep),
-	scns_( scns),
         active_worker_( 0),
-	idle_worker_( 0),
-	running_worker_( 0)
+	idle_worker_( 0)
         {
-		if ( asleep_.is_special() || asleep_.is_negative() )
+		if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 channel_.activate();
-		unique_lock< shared_mutex > lk( mtx_worker_);
+		unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
-			create_worker_();
+			create_worker_( psize, asleep, max_scns);
                 lk.unlock();
         }
 
@@ -317,52 +157,46 @@
                 high_watermark const& hwm,
                 low_watermark const& lwm,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 100),
-		scanns const& scns = scanns( 20) )
+		scanns const& max_scns = scanns( 20) )
         :
-	worker_(),
-	mtx_worker_(),
+	wg_(),
+	mtx_wg_(),
         state_( 0),
         channel_(
                 hwm,
                 lwm),
-	asleep_( asleep),
-	scns_( scns),
         active_worker_( 0),
-	idle_worker_( 0),
-	running_worker_( 0)
+	idle_worker_( 0)
         {
-		if ( asleep_.is_special() || asleep_.is_negative() )
+		if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 channel_.activate();
-		unique_lock< shared_mutex > lk( mtx_worker_);
+		unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
-			create_worker_();
+			create_worker_( psize, asleep, max_scns);
                 lk.unlock();
         }
 
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
         explicit pool(
                 posix_time::time_duration const& asleep = posix_time::microseconds( 10),
-		scanns const& scns = scanns( 20) )
+		scanns const& max_scns = scanns( 20) )
         :
-	worker_(),
-	mtx_worker_(),
+	wg_(),
+	mtx_wg_(),
         state_( 0),
         channel_(),
-	asleep_( asleep),
-	scns_( scns),
         active_worker_( 0),
-	idle_worker_( 0),
-	running_worker_( 0)
+	idle_worker_( 0)
         {
-		if ( asleep_.is_special() || asleep_.is_negative() )
+		if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 std::size_t psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
-		unique_lock< shared_mutex > lk( mtx_worker_);
+		unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
-			create_worker_( i);
+			create_worker_( psize, asleep, max_scns, i);
                 lk.unlock();
         }
 
@@ -370,28 +204,25 @@
                 high_watermark const& hwm,
                 low_watermark const& lwm,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 100),
-		scanns const& scns = scanns( 20) )
+		scanns const& max_scns = scanns( 20) )
         :
-	worker_(),
-	mtx_worker_(),
+	wg_(),
+	mtx_wg_(),
         state_( 0),
         channel_(
                 hwm,
                 lwm),
-	asleep_( asleep),
-	scns_( scns),
         active_worker_( 0),
-	idle_worker_( 0),
-	running_worker_( 0)
+	idle_worker_( 0)
         {
-		if ( asleep_.is_special() || asleep_.is_negative() )
+		if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 std::size_t psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
-		unique_lock< shared_mutex > lk( mtx_worker_);
+		unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
-			create_worker_( i);
+			create_worker_( psize, asleep, max_scns, i);
                 lk.unlock();
         }
 #endif
@@ -401,26 +232,24 @@
 
         std::size_t active()
         {
-		shared_lock< shared_mutex > lk( mtx_worker_);
+		shared_lock< shared_mutex > lk( mtx_wg_);
                 return active_();
         }
 
         std::size_t idle()
         {
-		shared_lock< shared_mutex > lk( mtx_worker_);
+		shared_lock< shared_mutex > lk( mtx_wg_);
                 return idle_();
         }
-
+	
         void shutdown()
         {
                 if ( closed_() || close_() > 1) return;
 
                 channel_.deactivate();
-		shared_lock< shared_mutex > lk( mtx_worker_);
-		BOOST_FOREACH( detail::worker w, worker_)
-		{ w.signal_shutdown(); }
-		BOOST_FOREACH( detail::worker w, worker_)
-		{ w.join(); }
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		wg_.signal_shutdown_all();
+		wg_.join_all();
                 lk.unlock();
         }
 
@@ -430,14 +259,10 @@
                         return std::vector< detail::callable >();
 
                 channel_.deactivate_now();
-		shared_lock< shared_mutex > lk( mtx_worker_);
-		BOOST_FOREACH( detail::worker w, worker_)
-		{
-			w.signal_shutdown_now();
-			w.interrupt();
-		}
-		BOOST_FOREACH( detail::worker w, worker_)
-		{ w.join(); }
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		wg_.signal_shutdown_now_all();
+		wg_.interrupt_all();
+		wg_.join_all();
                 lk.unlock();
                 std::vector< detail::callable > drain( channel_.drain() );
 
@@ -446,7 +271,7 @@
 
         std::size_t size()
         {
-		shared_lock< shared_mutex > lk( mtx_worker_);
+		shared_lock< shared_mutex > lk( mtx_wg_);
                 return size_();
         }
 
@@ -490,8 +315,8 @@
                                         f) );
                         tsk.set_wait_callback(
                                 bind(
-					( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
-					this,
+					( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
+					w,
                                         wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
@@ -528,8 +353,8 @@
                                         f) );
                         tsk.set_wait_callback(
                                 bind(
-					( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
-					this,
+					( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
+					w,
                                         wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
@@ -545,11 +370,6 @@
                 }
         }
 };
-
-template< typename Channel >
-thread_specific_ptr< typename pool< Channel >::random_idx >
-pool< Channel >::tss_rnd_idx_;
-
 }}
 
 #endif // BOOST_TP_POOL_H
Modified: sandbox/threadpool/libs/tp/build/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/build/Jamfile.v2	(original)
+++ sandbox/threadpool/libs/tp/build/Jamfile.v2	2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -14,7 +14,7 @@
       <link>static:<define>BOOST_THREADPOOL_STATIC_LINK=1
     ;
 
-SOURCES = callable  default_pool  guard  interrupter  poolsize  scanns  watermark worker ;
+SOURCES = callable default_pool guard interrupter poolsize scanns watermark worker worker_group ;
 
 lib boost_threadpool
    : $(SOURCES).cpp
Modified: sandbox/threadpool/libs/tp/src/worker.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/worker.cpp	(original)
+++ sandbox/threadpool/libs/tp/src/worker.cpp	2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -6,111 +6,6 @@
 {
 thread_specific_ptr< worker > worker::tss_;
 
-worker::impl::impl( function< void() > const& fn)
-:
-thrd_( new thread( fn) ),
-wsq_(),
-shtdwn_sem_( 0),
-shtdwn_now_sem_( 0),
-shtdwn_( false),
-scns_( 0)
-{ BOOST_ASSERT( ! fn.empty() ); }
-
-const shared_ptr< thread >
-worker::impl::thrd() const
-{ return thrd_; }
-
-const thread::id
-worker::impl::get_id() const
-{ return thrd_->get_id(); }
-
-void
-worker::impl::join() const
-{ thrd_->join(); }
-
-void
-worker::impl::interrupt() const
-{ thrd_->interrupt(); }
-
-void
-worker::impl::put(
-	callable const& ca,
-	interrupter const& intr)
-{
-	BOOST_ASSERT( ! ca.empty() );
-	wsq_.put( std::make_pair( ca, intr) );
-}
-
-bool
-worker::impl::try_take(
-	callable & ca,
-	interrupter & intr)
-{
-	item itm;
-	bool result( wsq_.try_take( itm) );
-	if ( result)
-	{
-		ca = itm.first;
-		intr = itm.second;
-	}
-	return result;
-}
-
-bool
-worker::impl::try_steal(
-	callable & ca,
-	interrupter & intr)
-{
-	item itm;
-	bool result( wsq_.try_steal( itm) );
-	if ( result)
-	{
-		ca = itm.first;
-		intr = itm.second;
-	}
-	return result;
-}
-
-bool
-worker::impl::empty()
-{ return wsq_.empty(); }
-
-void
-worker::impl::signal_shutdown()
-{ shtdwn_sem_.post(); }
-
-void
-worker::impl::signal_shutdown_now()
-{ shtdwn_now_sem_.post(); }
-
-bool
-worker::impl::shutdown()
-{
-	if ( ! shtdwn_)
-		shtdwn_ = shtdwn_sem_.try_wait();
-	return shtdwn_;
-}
-
-bool
-worker::impl::shutdown_now()
-{ return shtdwn_now_sem_.try_wait(); }
-
-std::size_t
-worker::impl::scanns() const
-{ return scns_; }
-
-void
-worker::impl::increment_scanns()
-{ ++scns_; }
-
-void
-worker::impl::reset_scanns()
-{ scns_ = 0; }
-
-const shared_ptr< thread >
-worker::thrd() const
-{ return impl_->thrd(); }
-
 const thread::id
 worker::get_id() const
 { return impl_->get_id(); }
@@ -124,6 +19,14 @@
 { impl_->interrupt(); }
 
 void
+worker::signal_shutdown()
+{ impl_->signal_shutdown(); }
+
+void
+worker::signal_shutdown_now()
+{ impl_->signal_shutdown_now(); }
+
+void
 worker::put(
         callable const& ca,
         interrupter const& intr)
@@ -141,49 +44,19 @@
         interrupter & intr)
 { return impl_->try_steal( ca, intr); }
 
-bool
-worker::empty() const
-{ return impl_->empty(); }
-
-void
-worker::signal_shutdown()
-{ impl_->signal_shutdown(); }
-
 void
-worker::signal_shutdown_now()
-{ impl_->signal_shutdown_now(); }
-
-bool
-worker::shutdown()
-{ return impl_->shutdown(); }
-
-bool
-worker::shutdown_now()
-{ return impl_->shutdown_now(); }
-
-std::size_t
-worker::scanns() const
-{ return impl_->scanns(); }
-
-void
-worker::increment_scanns()
-{ impl_->increment_scanns(); }
-
-void
-worker::reset_scanns()
-{ impl_->reset_scanns(); }
+worker::reschedule_until( function< bool() > const& pred)
+{ return impl_->schedule_until( pred); }
 
 void
-worker::reschedule_until( function< bool() > const& pred)
-{ return impl_->reschedule_until( pred); }
+worker::run()
+{
+	worker::tss_.reset( new worker( * this) );
+	impl_->run();
+}
 
 worker *
 worker::tss_get()
 { return worker::tss_.get(); }
-
-void
-worker::tss_reset( worker * w)
-{ worker::tss_.reset( w); }
-
 } } }
 
Added: sandbox/threadpool/libs/tp/src/worker_group.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/worker_group.cpp	2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -0,0 +1,79 @@
+//  Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+//  Software License, Version 1.0. (See accompanying file
+//  LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/tp/detail/worker_group.hpp"
+
+#include <boost/utility.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+worker_group::worker_group()
+:
+cont_(),
+id_idx_( cont_.get< id_idx_tag >() ),
+rnd_idx_( cont_.get< rnd_idx_tag >() )
+{}
+
+const worker
+worker_group::operator[]( std::size_t pos) const
+{ return rnd_idx_[pos]; }
+
+std::size_t
+worker_group::size() const
+{ return cont_.size(); }
+
+const worker_group::iterator
+worker_group::begin()
+{ return id_idx_.begin(); }
+
+const worker_group::const_iterator
+worker_group::begin() const
+{ return id_idx_.begin(); }
+
+const worker_group::iterator
+worker_group::end()
+{ return id_idx_.end(); }
+
+const worker_group::const_iterator
+worker_group::end() const
+{ return id_idx_.end(); }
+
+const worker_group::const_iterator
+worker_group::find( thread::id const& id) const
+{ return id_idx_.find( id); }
+
+void
+worker_group::join_all()
+{
+	BOOST_FOREACH( worker w, cont_)
+	{ w.join(); }
+}
+
+void
+worker_group::interrupt_all()
+{
+	BOOST_FOREACH( worker w, cont_)
+	{ w.interrupt(); }
+}
+
+void
+worker_group::insert( worker const& w)
+{ cont_.insert( w); }
+
+void
+worker_group::signal_shutdown_all()
+{
+	BOOST_FOREACH( worker w, cont_)
+	{ w.signal_shutdown(); }
+}
+
+void
+worker_group::signal_shutdown_now_all()
+{
+	BOOST_FOREACH( worker w, cont_)
+	{ w.signal_shutdown_now(); }
+}
+} } }
+