$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r54901 - in sandbox/task: boost/task boost/task/detail libs/task/doc libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-07-11 15:57:41
Author: olli
Date: 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
New Revision: 54901
URL: http://svn.boost.org/trac/boost/changeset/54901
Log:
* refactoring
Added:
   sandbox/task/boost/task/detail/pool_base.hpp   (contents, props changed)
Text files modified: 
   sandbox/task/boost/task/detail/worker.hpp        |   399 +++++++++++++++++++-------------------- 
   sandbox/task/boost/task/static_pool.hpp          |   322 +-------------------------------        
   sandbox/task/libs/task/doc/overview.qbk          |     2                                         
   sandbox/task/libs/task/doc/todo.qbk              |     9                                         
   sandbox/task/libs/task/src/guard.cpp             |     6                                         
   sandbox/task/libs/task/src/interrupter.cpp       |     3                                         
   sandbox/task/libs/task/src/poolsize.cpp          |     2                                         
   sandbox/task/libs/task/src/scanns.cpp            |     3                                         
   sandbox/task/libs/task/src/semaphore_posix.cpp   |     2                                         
   sandbox/task/libs/task/src/semaphore_windows.cpp |     2                                         
   sandbox/task/libs/task/src/watermark.cpp         |     4                                         
   sandbox/task/libs/task/src/worker.cpp            |     7                                         
   sandbox/task/libs/task/src/worker_group.cpp      |     6                                         
   sandbox/task/libs/task/src/wsq.cpp               |     7                                         
   14 files changed, 239 insertions(+), 535 deletions(-)
Added: sandbox/task/boost/task/detail/pool_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/detail/pool_base.hpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -0,0 +1,331 @@
+
+//          Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+//    (See accompanying file LICENSE_1_0.txt or copy at
+//          http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_DETAIL_POOL_BASE_H
+#define BOOST_TASK_DETAIL_POOL_BASE_H
+
+#include <cstddef>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/config.hpp>
+#include <boost/cstdint.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/detail/move.hpp>
+
+#include <boost/task/detail/atomic.hpp>
+#include <boost/task/detail/bind_processor.hpp>
+#include <boost/task/detail/worker.hpp>
+#include <boost/task/detail/worker_group.hpp>
+
+#include <boost/task/callable.hpp>
+#include <boost/task/context.hpp>
+#include <boost/task/exceptions.hpp>
+#include <boost/task/future.hpp>
+#include <boost/task/handle.hpp>
+#include <boost/task/poolsize.hpp>
+#include <boost/task/scanns.hpp>
+#include <boost/task/task.hpp>
+#include <boost/task/watermark.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost { namespace task {
+namespace detail
+{
+
+template< typename Channel >
+class pool_base
+{
+private:
+	friend class worker;
+
+	template< typename T, typename X >
+	friend class worker_object;
+
+	typedef Channel					channel;
+	typedef typename channel::item	channel_item;
+	
+	worker_group			wg_;
+	shared_mutex			mtx_wg_;
+	volatile uint32_t		state_;
+	channel			 		channel_;
+	volatile uint32_t		active_worker_;
+	volatile uint32_t		idle_worker_;
+
+	void worker_entry_()
+	{
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		typename detail::worker_group::iterator i( wg_.find( this_thread::get_id() ) );
+		lk.unlock();
+		BOOST_ASSERT( i != wg_.end() );
+
+		worker w( * i);
+		w.run();
+	}
+
+	void create_worker_(
+		poolsize const& psize,
+		posix_time::time_duration const& asleep,
+		scanns const& max_scns)
+	{
+		wg_.insert(
+			worker(
+				* this,
+				psize,
+				asleep,
+				max_scns,
+				boost::bind(
+					& pool_base::worker_entry_,
+					this) ) );
+	}
+
+# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
+	void worker_entry_( std::size_t n)
+	{
+		this_thread::bind_to_processor( n);
+		worker_entry_();
+	}
+
+	void create_worker_(
+		poolsize const& psize,
+		posix_time::time_duration const& asleep,
+		scanns const& max_scns,
+		std::size_t n)
+	{
+		wg_.insert(
+			worker(
+				* this,
+				psize,
+				asleep,
+				max_scns,
+				boost::bind(
+					& pool_base::worker_entry_,
+					this,
+					n) ) );
+	}
+# endif
+
+	std::size_t active_() const
+	{ return active_worker_; }
+
+	std::size_t idle_() const
+	{ return size_() - active_(); }
+
+	std::size_t size_() const
+	{ return wg_.size(); }
+
+	bool closed_() const
+	{ return state_ > 0; }
+
+	bool close_()
+	{ return atomic_fetch_add( & state_, 1) > 1; }
+
+public:
+	explicit pool_base(
+		poolsize const& psize,
+		posix_time::time_duration const& asleep = posix_time::microseconds( 10),
+		scanns const& max_scns = scanns( 20) )
+	:
+	wg_(),
+	mtx_wg_(),
+	state_( 0),
+	channel_(),
+	active_worker_( 0),
+	idle_worker_( 0)
+	{
+		if ( asleep.is_special() || asleep.is_negative() )
+			throw invalid_timeduration();
+		channel_.activate();
+		lock_guard< shared_mutex > lk( mtx_wg_);
+		for ( std::size_t i( 0); i < psize; ++i)
+			create_worker_( psize, asleep, max_scns);
+	}
+
+	explicit pool_base(
+		poolsize const& psize,
+		high_watermark const& hwm,
+		low_watermark const& lwm,
+		posix_time::time_duration const& asleep = posix_time::microseconds( 100),
+		scanns const& max_scns = scanns( 20) )
+	:
+	wg_(),
+	mtx_wg_(),
+	state_( 0),
+	channel_(
+		hwm,
+		lwm),
+	active_worker_( 0),
+	idle_worker_( 0)
+	{
+		if ( asleep.is_special() || asleep.is_negative() )
+			throw invalid_timeduration();
+		channel_.activate();
+		lock_guard< shared_mutex > lk( mtx_wg_);
+		for ( std::size_t i( 0); i < psize; ++i)
+			create_worker_( psize, asleep, max_scns);
+	}
+
+# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
+	explicit pool_base(
+		posix_time::time_duration const& asleep = posix_time::microseconds( 10),
+		scanns const& max_scns = scanns( 20) )
+	:
+	wg_(),
+	mtx_wg_(),
+	state_( 0),
+	channel_(),
+	active_worker_( 0),
+	idle_worker_( 0)
+	{
+		if ( asleep.is_special() || asleep.is_negative() )
+			throw invalid_timeduration();
+		poolsize psize( thread::hardware_concurrency() );
+		BOOST_ASSERT( psize > 0);
+		channel_.activate();
+		lock_guard< shared_mutex > lk( mtx_wg_);
+		for ( std::size_t i( 0); i < psize; ++i)
+			create_worker_( psize, asleep, max_scns, i);
+	}
+
+	explicit pool_base(
+		high_watermark const& hwm,
+		low_watermark const& lwm,
+		posix_time::time_duration const& asleep = posix_time::microseconds( 100),
+		scanns const& max_scns = scanns( 20) )
+	:
+	wg_(),
+	mtx_wg_(),
+	state_( 0),
+	channel_(
+		hwm,
+		lwm),
+	active_worker_( 0),
+	idle_worker_( 0)
+	{
+		if ( asleep.is_special() || asleep.is_negative() )
+			throw invalid_timeduration();
+		poolsize psize( thread::hardware_concurrency() );
+		BOOST_ASSERT( psize > 0);
+		channel_.activate();
+		lock_guard< shared_mutex > lk( mtx_wg_);
+		for ( std::size_t i( 0); i < psize; ++i)
+			create_worker_( psize, asleep, max_scns, i);
+	}
+# endif
+
+	~pool_base()
+	{ shutdown(); }
+
+	std::size_t active()
+	{
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		return active_();
+	}
+
+	std::size_t idle()
+	{
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		return idle_();
+	}
+
+	void interrupt_all_worker()
+	{
+		if ( closed_() ) return;
+
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		wg_.interrupt_all();
+	}
+
+	void shutdown()
+	{
+		if ( closed_() || close_() ) return;
+
+		channel_.deactivate();
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		wg_.signal_shutdown_all();
+		wg_.join_all();
+	}
+
+	const void shutdown_now()
+	{
+		if ( closed_() || close_() ) return;
+
+		channel_.deactivate_now();
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		wg_.signal_shutdown_now_all();
+		wg_.interrupt_all();
+		wg_.join_all();
+	}
+
+	std::size_t size()
+	{
+		shared_lock< shared_mutex > lk( mtx_wg_);
+		return size_();
+	}
+
+	bool closed()
+	{ return closed_(); }
+
+	void clear()
+	{ channel_.clear(); }
+
+	bool empty()
+	{ return channel_.empty(); }
+
+	std::size_t pending()
+	{ return channel_.size(); }
+
+	std::size_t upper_bound()
+	{ return channel_.upper_bound(); }
+
+	void upper_bound( high_watermark const& hwm)
+	{ channel_.upper_bound( hwm); }
+
+	std::size_t lower_bound()
+	{ return channel_.lower_bound(); }
+
+	void lower_bound( low_watermark const lwm)
+	{ channel_.lower_bound( lwm); }
+
+	template< typename R >
+	handle< R > submit( task< R > t)
+	{
+		if ( closed_() )
+			throw task_rejected("pool is closed");
+
+		shared_future< R > f( t.get_future() );
+		context ctx;
+		handle< R > h( ctx.get_handle( f) );
+		channel_.put(
+				ctx.get_callable( boost::move( t) ) );
+		return h;
+	}
+
+	template< typename R, typename Attr >
+	handle< R > submit( task< R > t, Attr const& attr)
+	{
+		if ( closed_() )
+			throw task_rejected("pool is closed");
+
+		shared_future< R > f( t.get_future() );
+		context ctx;
+		handle< R > h( ctx.get_handle( f) );
+		channel_.put(
+				channel_item(
+					ctx.get_callable( boost::move( t) ),
+					attr) );
+		return h;
+	}
+};
+
+}}}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif // BOOST_TASK_DETAIL_POOL_BASE_H
+
Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp	(original)
+++ sandbox/task/boost/task/detail/worker.hpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -33,259 +33,254 @@
 # pragma warning(disable:4251 4275)
 # endif
 
-namespace boost { namespace task
-{
-
-template< typename Channel >
-class static_pool;
-
+namespace boost { namespace task {
 namespace detail
 {
 
-class BOOST_TASK_DECL worker
+struct worker_base
 {
-private:
-	template< typename Channel >
-	friend class static_pool;
+	virtual ~worker_base() {}
 
-	static thread_specific_ptr< worker >	tss_;
+	virtual const thread::id get_id() const = 0;
 
-	struct worker_base
-	{
-		virtual ~worker_base() {}
+	virtual void join() const = 0;
 
-		virtual const thread::id get_id() const = 0;
+	virtual void interrupt() const = 0;
 
-		virtual void join() const = 0;
+	virtual void put( callable const&) = 0;
 
-		virtual void interrupt() const = 0;
+	virtual bool try_take( callable &) = 0;
 
-		virtual void put( callable const&) = 0;
+	virtual bool try_steal( callable &) = 0;
 
-		virtual bool try_take( callable &) = 0;
+	virtual void signal_shutdown() = 0;
 
-		virtual bool try_steal( callable &) = 0;
+	virtual void signal_shutdown_now() = 0;
 
-		virtual void signal_shutdown() = 0;
+	virtual void reschedule_until( function< bool() > const&) = 0;
 
-		virtual void signal_shutdown_now() = 0;
+	virtual void run() = 0;
+};
+
+template<
+	typename Pool,
+	typename Worker
+>
+class worker_object : public worker_base,
+					  private noncopyable
+{
+private:
+	class random_idx
+	{
+	private:
+		rand48 rng_;
+		uniform_int<> six_;
+		variate_generator< rand48 &, uniform_int<> > die_;
 
-		virtual void reschedule_until( function< bool() > const&) = 0;
+	public:
+		random_idx( std::size_t size)
+		:
+		rng_(),
+		six_( 0, size - 1),
+		die_( rng_, six_)
+		{}
 
-		virtual void run() = 0;
+		std::size_t operator()()
+		{ return die_(); }
         };
 
-	template< typename Pool >
-	class worker_object : public worker_base,
-					      private noncopyable
-	{
-	private:
-		class random_idx
-		{
-		private:
-			rand48 rng_;
-			uniform_int<> six_;
-			variate_generator< rand48 &, uniform_int<> > die_;
-
-		public:
-			random_idx( std::size_t size)
-			:
-			rng_(),
-			six_( 0, size - 1),
-			die_( rng_, six_)
-			{}
-
-			std::size_t operator()()
-			{ return die_(); }
-		};
-
-		Pool					&	pool_;
-		shared_ptr< thread >		thrd_;
-		wsq				 			wsq_;
-		semaphore					shtdwn_sem_;
-		semaphore					shtdwn_now_sem_;
-		bool						shtdwn_;
-		posix_time::time_duration	asleep_;
-		scanns						max_scns_;
-		std::size_t					scns_;
-		random_idx					rnd_idx_;
+	Pool					&	pool_;
+	shared_ptr< thread >		thrd_;
+	wsq				 			wsq_;
+	semaphore					shtdwn_sem_;
+	semaphore					shtdwn_now_sem_;
+	bool						shtdwn_;
+	posix_time::time_duration	asleep_;
+	scanns						max_scns_;
+	std::size_t					scns_;
+	random_idx					rnd_idx_;
 
-		void execute_( callable & ca)
+	void execute_( callable & ca)
+	{
+		BOOST_ASSERT( ! ca.empty() );
+		guard grd( get_pool().active_worker_);
                 {
-			BOOST_ASSERT( ! ca.empty() );
-			guard grd( get_pool().active_worker_);
-			{
-				context_guard lk( ca, thrd_);
-				ca();
-			}
-			ca.clear();
-			BOOST_ASSERT( ca.empty() );
+			context_guard lk( ca, thrd_);
+			ca();
                 }
+		ca.clear();
+		BOOST_ASSERT( ca.empty() );
+	}
 
-		void next_callable_( callable & ca)
+	void next_callable_( callable & ca)
+	{
+		if ( ! try_take( ca) )
                 {
-			if ( ! try_take( ca) )
+			if ( ! get_pool().channel_.try_take( ca) )
                         {
-				if ( ! get_pool().channel_.try_take( ca) )
+				std::size_t idx( rnd_idx_() );
+				for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
                                 {
-					std::size_t idx( rnd_idx_() );
-					for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
-					{
-						worker other( get_pool().wg_[idx]);
-						if ( this_thread::get_id() == other.get_id() ) continue;
-						if ( ++idx >= get_pool().wg_.size() ) idx = 0;
-						if ( other.try_steal( ca) ) break;
-					}
+					Worker other( get_pool().wg_[idx]);
+					if ( this_thread::get_id() == other.get_id() ) continue;
+					if ( ++idx >= get_pool().wg_.size() ) idx = 0;
+					if ( other.try_steal( ca) ) break;
+				}
 
-					if ( ca.empty() )
+				if ( ca.empty() )
+				{
+					guard grd( get_pool().idle_worker_);
+					if ( shutdown_() ) return;
+					++scns_;
+					if ( scns_ >= max_scns_)
                                         {
-						guard grd( get_pool().idle_worker_);
-						if ( shutdown_() ) return;
-						++scns_;
-						if ( scns_ >= max_scns_)
-						{
-							if ( get_pool().size_() == get_pool().idle_worker_)
-								get_pool().channel_.take( ca, asleep_);
-							else
-								this_thread::sleep( asleep_);
-							scns_ = 0;
-						}
+						if ( get_pool().size_() == get_pool().idle_worker_)
+							get_pool().channel_.take( ca, asleep_);
                                                 else
-							this_thread::yield();
+							this_thread::sleep( asleep_);
+						scns_ = 0;
                                         }
+					else
+						this_thread::yield();
                                 }
                         }
                 }
+	}
 
-		void next_local_callable_( callable & ca)
+	void next_local_callable_( callable & ca)
+	{
+		if ( ! try_take( ca) )
                 {
-			if ( ! try_take( ca) )
+			guard grd( get_pool().idle_worker_);
+			if ( shutdown_() ) return;
+			++scns_;
+			if ( scns_ >= max_scns_)
                         {
-				guard grd( get_pool().idle_worker_);
-				if ( shutdown_() ) return;
-				++scns_;
-				if ( scns_ >= max_scns_)
-				{
-					this_thread::sleep( asleep_);
-					scns_ = 0;
-				}
-				else
-					this_thread::yield();
+				this_thread::sleep( asleep_);
+				scns_ = 0;
                         }
+			else
+				this_thread::yield();
                 }
+	}
 
-		bool shutdown_()
-		{
-			if ( shutdown__() && get_pool().channel_.empty() )
-				return true;
-			else if ( shutdown_now__() )
-				return true;
-			return false;
-		}
+	bool shutdown_()
+	{
+		if ( shutdown__() && get_pool().channel_.empty() )
+			return true;
+		else if ( shutdown_now__() )
+			return true;
+		return false;
+	}
 
-		bool shutdown__()
-		{
-			if ( ! shtdwn_)
-				shtdwn_ = shtdwn_sem_.try_wait();
-			return shtdwn_;
-		}
-		
-		bool shutdown_now__()
-		{ return shtdwn_now_sem_.try_wait(); }
+	bool shutdown__()
+	{
+		if ( ! shtdwn_)
+			shtdwn_ = shtdwn_sem_.try_wait();
+		return shtdwn_;
+	}
+	
+	bool shutdown_now__()
+	{ return shtdwn_now_sem_.try_wait(); }
 
-	public:
-		worker_object(
-			Pool & pool,
-			poolsize const& psize,
-			posix_time::time_duration const& asleep,
-			scanns const& max_scns,
-			function< void() > const& fn)
-		:
-		pool_( pool),
-		thrd_( new thread( fn) ),
-		wsq_(),
-		shtdwn_sem_( 0),
-		shtdwn_now_sem_( 0),
-		shtdwn_( false),
-		asleep_( asleep),
-		max_scns_( max_scns),
-		scns_( 0),
-		rnd_idx_( psize)
-		{ BOOST_ASSERT( ! fn.empty() ); }
-
-		const thread::id get_id() const
-		{ return thrd_->get_id(); }
-
-		void join() const
-		{ thrd_->join(); }
-
-		void
-		interrupt() const
-		{ thrd_->interrupt(); }
-
-		void signal_shutdown()
-		{ shtdwn_sem_.post(); }
-		
-		void signal_shutdown_now()
-		{ shtdwn_now_sem_.post(); }
+public:
+	worker_object(
+		Pool & pool,
+		poolsize const& psize,
+		posix_time::time_duration const& asleep,
+		scanns const& max_scns,
+		function< void() > const& fn)
+	:
+	pool_( pool),
+	thrd_( new thread( fn) ),
+	wsq_(),
+	shtdwn_sem_( 0),
+	shtdwn_now_sem_( 0),
+	shtdwn_( false),
+	asleep_( asleep),
+	max_scns_( max_scns),
+	scns_( 0),
+	rnd_idx_( psize)
+	{ BOOST_ASSERT( ! fn.empty() ); }
+
+	const thread::id get_id() const
+	{ return thrd_->get_id(); }
+
+	void join() const
+	{ thrd_->join(); }
+
+	void
+	interrupt() const
+	{ thrd_->interrupt(); }
+
+	void signal_shutdown()
+	{ shtdwn_sem_.post(); }
+	
+	void signal_shutdown_now()
+	{ shtdwn_now_sem_.post(); }
 
-		void put( callable const& ca)
-		{
-			BOOST_ASSERT( ! ca.empty() );
-			wsq_.put( ca);
-		}
+	void put( callable const& ca)
+	{
+		BOOST_ASSERT( ! ca.empty() );
+		wsq_.put( ca);
+	}
 
-		bool try_take( callable & ca)
-		{
-			callable tmp;
-			bool result( wsq_.try_take( tmp) );
-			if ( result)
-				ca = tmp;
-			return result;
-		}
-		
-		bool try_steal( callable & ca)
-		{
-			callable tmp;
-			bool result( wsq_.try_steal( tmp) );
-			if ( result)
-				ca = tmp;
-			return result;
-		}
+	bool try_take( callable & ca)
+	{
+		callable tmp;
+		bool result( wsq_.try_take( tmp) );
+		if ( result)
+			ca = tmp;
+		return result;
+	}
+	
+	bool try_steal( callable & ca)
+	{
+		callable tmp;
+		bool result( wsq_.try_steal( tmp) );
+		if ( result)
+			ca = tmp;
+		return result;
+	}
 
-		Pool & get_pool() const
-		{ return pool_; }
+	Pool & get_pool() const
+	{ return pool_; }
 
-		void run()
-		{
-			BOOST_ASSERT( get_id() == this_thread::get_id() );
+	void run()
+	{
+		BOOST_ASSERT( get_id() == this_thread::get_id() );
 
-			callable ca;
-			while ( ! shutdown_() )
+		callable ca;
+		while ( ! shutdown_() )
+		{
+			next_callable_( ca);
+			if( ! ca.empty() )
                         {
-				next_callable_( ca);
-				if( ! ca.empty() )
-				{
-					execute_( ca);
-					scns_ = 0;
-				}
+				execute_( ca);
+				scns_ = 0;
                         }
                 }
+	}
 
-		void reschedule_until( function< bool() > const& pred)
+	void reschedule_until( function< bool() > const& pred)
+	{
+		callable ca;
+		while ( ! pred() )
                 {
-			callable ca;
-			while ( ! pred() )
+			next_local_callable_( ca);
+			if( ! ca.empty() )
                         {
-				next_local_callable_( ca);
-				if( ! ca.empty() )
-				{
-					execute_( ca);
-					scns_ = 0;
-				}
+				execute_( ca);
+				scns_ = 0;
                         }
                 }
-	};
+	}
+};
+
+class BOOST_TASK_DECL worker
+{
+private:
+	static thread_specific_ptr< worker >	tss_;
 
         shared_ptr< worker_base >	impl_;
 
@@ -299,7 +294,7 @@
                 function< void() > const& fn)
         :
         impl_(
-		new worker_object< Pool >(
+		new worker_object< Pool, worker >(
                         pool,
                         psize,
                         asleep,
@@ -323,7 +318,7 @@
         template< typename Pool >
         Pool & get_pool() const
         {
-		worker_object< Pool > * p( dynamic_cast< worker_object< Pool > * >( impl_.get() ) );
+		worker_object< Pool, worker > * p( dynamic_cast< worker_object< Pool, worker > * >( impl_.get() ) );
                 BOOST_ASSERT( p);
                 return p->get_pool();
         }
Modified: sandbox/task/boost/task/static_pool.hpp
==============================================================================
--- sandbox/task/boost/task/static_pool.hpp	(original)
+++ sandbox/task/boost/task/static_pool.hpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -8,26 +8,16 @@
 #define BOOST_TASK_STATIC_POOL_H
 
 #include <cstddef>
-#include <utility>
-#include <vector>
 
-#include <boost/assert.hpp>
-#include <boost/bind.hpp>
 #include <boost/config.hpp>
-#include <boost/cstdint.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/function.hpp>
-#include <boost/thread.hpp>
 #include <boost/thread/detail/move.hpp>
 
-#include <boost/task/callable.hpp>
-#include <boost/task/context.hpp>
-#include <boost/task/detail/atomic.hpp>
 #include <boost/task/detail/bind_processor.hpp>
-#include <boost/task/detail/worker.hpp>
+#include <boost/task/detail/pool_base.hpp>
+
 #include <boost/task/detail/worker_group.hpp>
 #include <boost/task/exceptions.hpp>
-#include <boost/task/future.hpp>
 #include <boost/task/handle.hpp>
 #include <boost/task/poolsize.hpp>
 #include <boost/task/scanns.hpp>
@@ -45,305 +35,14 @@
         typedef Channel		channel;
 
 private:
-	template< typename Pool >
-	friend struct has_attribute;
-
-	template< typename Pool >
-	friend struct attribute_type;
-
-	friend class detail::worker;
+	template< typename T, typename X >
+	friend class detail::worker_object;
 
-	template< typename T >
-	friend class detail::worker::worker_object;
-
-	typedef typename channel::item	channel_item;
-	
 # if defined(BOOST_HAS_PROCESSOR_BINDINGS)
         struct tag_bind_to_processors {};
 # endif
         
-	class pool_base
-	{
-	private:
-		friend class detail::worker;
-
-		template< typename T >
-		friend class detail::worker::worker_object;
-
-		detail::worker_group	wg_;
-		shared_mutex			mtx_wg_;
-		volatile uint32_t		state_;
-		channel			 		channel_;
-		volatile uint32_t		active_worker_;
-		volatile uint32_t		idle_worker_;
-
-		void worker_entry_()
-		{
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			typename detail::worker_group::iterator i( wg_.find( this_thread::get_id() ) );
-			lk.unlock();
-			BOOST_ASSERT( i != wg_.end() );
-
-			detail::worker w( * i);
-			w.run();
-		}
-
-		void create_worker_(
-			poolsize const& psize,
-			posix_time::time_duration const& asleep,
-			scanns const& max_scns)
-		{
-			wg_.insert(
-				detail::worker(
-					* this,
-					psize,
-					asleep,
-					max_scns,
-					boost::bind(
-						& pool_base::worker_entry_,
-						this) ) );
-		}
-
-# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
-		void worker_entry_( std::size_t n)
-		{
-			this_thread::bind_to_processor( n);
-			worker_entry_();
-		}
-
-		void create_worker_(
-			poolsize const& psize,
-			posix_time::time_duration const& asleep,
-			scanns const& max_scns,
-			std::size_t n)
-		{
-			wg_.insert(
-				detail::worker(
-					* this,
-					psize,
-					asleep,
-					max_scns,
-					boost::bind(
-						& pool_base::worker_entry_,
-						this,
-						n) ) );
-		}
-# endif
-
-		std::size_t active_() const
-		{ return active_worker_; }
-
-		std::size_t idle_() const
-		{ return size_() - active_(); }
-
-		std::size_t size_() const
-		{ return wg_.size(); }
-
-		bool closed_() const
-		{ return state_ > 0; }
-
-		bool close_()
-		{ return detail::atomic_fetch_add( & state_, 1) > 1; }
-
-	public:
-		explicit pool_base(
-			poolsize const& psize,
-			posix_time::time_duration const& asleep = posix_time::microseconds( 10),
-			scanns const& max_scns = scanns( 20) )
-		:
-		wg_(),
-		mtx_wg_(),
-		state_( 0),
-		channel_(),
-		active_worker_( 0),
-		idle_worker_( 0)
-		{
-			if ( asleep.is_special() || asleep.is_negative() )
-				throw invalid_timeduration();
-			channel_.activate();
-			lock_guard< shared_mutex > lk( mtx_wg_);
-			for ( std::size_t i( 0); i < psize; ++i)
-				create_worker_( psize, asleep, max_scns);
-		}
-
-		explicit pool_base(
-			poolsize const& psize,
-			high_watermark const& hwm,
-			low_watermark const& lwm,
-			posix_time::time_duration const& asleep = posix_time::microseconds( 100),
-			scanns const& max_scns = scanns( 20) )
-		:
-		wg_(),
-		mtx_wg_(),
-		state_( 0),
-		channel_(
-			hwm,
-			lwm),
-		active_worker_( 0),
-		idle_worker_( 0)
-		{
-			if ( asleep.is_special() || asleep.is_negative() )
-				throw invalid_timeduration();
-			channel_.activate();
-			lock_guard< shared_mutex > lk( mtx_wg_);
-			for ( std::size_t i( 0); i < psize; ++i)
-				create_worker_( psize, asleep, max_scns);
-		}
-
-# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
-		explicit pool_base(
-			posix_time::time_duration const& asleep = posix_time::microseconds( 10),
-			scanns const& max_scns = scanns( 20) )
-		:
-		wg_(),
-		mtx_wg_(),
-		state_( 0),
-		channel_(),
-		active_worker_( 0),
-		idle_worker_( 0)
-		{
-			if ( asleep.is_special() || asleep.is_negative() )
-				throw invalid_timeduration();
-			poolsize psize( thread::hardware_concurrency() );
-			BOOST_ASSERT( psize > 0);
-			channel_.activate();
-			lock_guard< shared_mutex > lk( mtx_wg_);
-			for ( std::size_t i( 0); i < psize; ++i)
-				create_worker_( psize, asleep, max_scns, i);
-		}
-
-		explicit pool_base(
-			high_watermark const& hwm,
-			low_watermark const& lwm,
-			posix_time::time_duration const& asleep = posix_time::microseconds( 100),
-			scanns const& max_scns = scanns( 20) )
-		:
-		wg_(),
-		mtx_wg_(),
-		state_( 0),
-		channel_(
-			hwm,
-			lwm),
-		active_worker_( 0),
-		idle_worker_( 0)
-		{
-			if ( asleep.is_special() || asleep.is_negative() )
-				throw invalid_timeduration();
-			poolsize psize( thread::hardware_concurrency() );
-			BOOST_ASSERT( psize > 0);
-			channel_.activate();
-			lock_guard< shared_mutex > lk( mtx_wg_);
-			for ( std::size_t i( 0); i < psize; ++i)
-				create_worker_( psize, asleep, max_scns, i);
-		}
-# endif
-
-		~pool_base()
-		{ shutdown(); }
-
-		std::size_t active()
-		{
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			return active_();
-		}
-
-		std::size_t idle()
-		{
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			return idle_();
-		}
-
-		void interrupt_all_worker()
-		{
-			if ( closed_() ) return;
-
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			wg_.interrupt_all();
-		}
-
-		void shutdown()
-		{
-			if ( closed_() || close_() ) return;
-
-			channel_.deactivate();
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			wg_.signal_shutdown_all();
-			wg_.join_all();
-		}
-
-		const void shutdown_now()
-		{
-			if ( closed_() || close_() ) return;
-
-			channel_.deactivate_now();
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			wg_.signal_shutdown_now_all();
-			wg_.interrupt_all();
-			wg_.join_all();
-		}
-
-		std::size_t size()
-		{
-			shared_lock< shared_mutex > lk( mtx_wg_);
-			return size_();
-		}
-
-		bool closed()
-		{ return closed_(); }
-
-		void clear()
-		{ channel_.clear(); }
-
-		bool empty()
-		{ return channel_.empty(); }
-
-		std::size_t pending()
-		{ return channel_.size(); }
-
-		std::size_t upper_bound()
-		{ return channel_.upper_bound(); }
-
-		void upper_bound( high_watermark const& hwm)
-		{ channel_.upper_bound( hwm); }
-
-		std::size_t lower_bound()
-		{ return channel_.lower_bound(); }
-
-		void lower_bound( low_watermark const lwm)
-		{ channel_.lower_bound( lwm); }
-
-		template< typename R >
-		handle< R > submit( task< R > t)
-		{
-			if ( closed_() )
-				throw task_rejected("pool is closed");
-
-			shared_future< R > f( t.get_future() );
-			context ctx;
-			handle< R > h( ctx.get_handle( f) );
-			channel_.put(
-					ctx.get_callable( boost::move( t) ) );
-			return h;
-		}
-
-		template< typename R, typename Attr >
-		handle< R > submit( task< R > t, Attr const& attr)
-		{
-			if ( closed_() )
-				throw task_rejected("pool is closed");
-
-			shared_future< R > f( t.get_future() );
-			context ctx;
-			handle< R > h( ctx.get_handle( f) );
-			channel_.put(
-					channel_item(
-						ctx.get_callable( boost::move( t) ),
-						attr) );
-			return h;
-		}
-	};
-	
-	shared_ptr< pool_base >		pool_;
+	shared_ptr< detail::pool_base< Channel > >		pool_;
 
         static_pool( static_pool &);
         static_pool & operator=( static_pool &);
@@ -357,7 +56,7 @@
                 poolsize const& psize,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 10),
                 scanns const& max_scns = scanns( 20) )
-	: pool_( new pool_base( psize, asleep, max_scns) )
+	: pool_( new detail::pool_base< Channel >( psize, asleep, max_scns) )
         {}
 
         explicit static_pool(
@@ -366,7 +65,7 @@
                 low_watermark const& lwm,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 100),
                 scanns const& max_scns = scanns( 20) )
-	: pool_( new pool_base( psize, hwm, lwm, asleep, max_scns) )
+	: pool_( new detail::pool_base< Channel >( psize, hwm, lwm, asleep, max_scns) )
         {}
 
 # if defined(BOOST_HAS_PROCESSOR_BINDINGS)
@@ -374,7 +73,7 @@
                 tag_bind_to_processors,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 10),
                 scanns const& max_scns = scanns( 20) )
-	: pool_( new pool_base( asleep, max_scns) )
+	: pool_( new detail::pool_base< Channel >( asleep, max_scns) )
         {}
 
         explicit static_pool(
@@ -383,7 +82,7 @@
                 low_watermark const& lwm,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 100),
                 scanns const& max_scns = scanns( 20) )
-	: pool_( new pool_base( hwm, lwm, asleep, max_scns) )
+	: pool_( new detail::pool_base< Channel >( hwm, lwm, asleep, max_scns) )
         {}
 
         static tag_bind_to_processors bind_to_processors()
@@ -540,7 +239,7 @@
                 return pool_->submit( boost::move( t), attr);
         }
 
-	typedef typename shared_ptr< pool_base >::unspecified_bool_type	unspecified_bool_type;
+	typedef typename shared_ptr< detail::pool_base< Channel > >::unspecified_bool_type	unspecified_bool_type;
 
         operator unspecified_bool_type() const // throw()
         { return pool_; }
@@ -566,6 +265,7 @@
 task::static_pool< Channel >  move( boost::detail::thread_move_t< task::static_pool< Channel > > t)
 { return task::static_pool< Channel >( t); }
 # endif
+
 }
 
 #include <boost/config/abi_suffix.hpp>
Modified: sandbox/task/libs/task/doc/overview.qbk
==============================================================================
--- sandbox/task/libs/task/doc/overview.qbk	(original)
+++ sandbox/task/libs/task/doc/overview.qbk	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -133,6 +133,8 @@
 
 [note __boost_task__ uses __boost_future__ from Anthony Williams (will be integrated in some of the next releases of __boost_thread__).]
 
+[warning This library is NOT an official Boost library]
+
 [note Please note that __boost_task__ is not optimized yet.]
 
 
Modified: sandbox/task/libs/task/doc/todo.qbk
==============================================================================
--- sandbox/task/libs/task/doc/todo.qbk	(original)
+++ sandbox/task/libs/task/doc/todo.qbk	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -39,16 +39,15 @@
 
 [heading Interdepended task]
 
-* With special support of __boost_fiber__ interdepended tasks (using communication and synchronisation abstractions above) work in 
-__thread_pools__ without deadlocking the pool. 
+* integration of __boost_fiber__  - interdepended tasks (using communication and synchronisation abstractions above) work in 
+__thread_pools__ without deadlocking the pool
 
 
 [heading Optimizations]
  
-* two-lock-queue as global queue in __thread_pool__
-
-* maybe lock-free-queue as global queue too (how to provide the scheduling policies fifo, priority, smart?)
+* finer-grainer bounded_channel and unbounded_channel using two-lock-queue
 
+* lock-free-queue with fifo ordering as channel
 
 
 [endsect]
Modified: sandbox/task/libs/task/src/guard.cpp
==============================================================================
--- sandbox/task/libs/task/src/guard.cpp	(original)
+++ sandbox/task/libs/task/src/guard.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,10 +10,10 @@
 
 #include <boost/task/detail/atomic.hpp>
 
-namespace boost { namespace task
-{
+namespace boost { namespace task {
 namespace detail
 {
+
 guard::guard( volatile uint32_t & active_worker)
 : active_worker_( active_worker)
 {
@@ -26,5 +26,5 @@
         atomic_fetch_sub( & active_worker_, 1);
         BOOST_ASSERT( active_worker_ >= 0);
 }
-} } }
 
+}}}
Modified: sandbox/task/libs/task/src/interrupter.cpp
==============================================================================
--- sandbox/task/libs/task/src/interrupter.cpp	(original)
+++ sandbox/task/libs/task/src/interrupter.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -8,8 +8,7 @@
 
 #include <boost/assert.hpp>
 
-namespace boost { namespace task
-{
+namespace boost { namespace task {
 namespace detail
 {
 
Modified: sandbox/task/libs/task/src/poolsize.cpp
==============================================================================
--- sandbox/task/libs/task/src/poolsize.cpp	(original)
+++ sandbox/task/libs/task/src/poolsize.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,11 +10,13 @@
 
 namespace boost { namespace task
 {
+
 poolsize::poolsize( std::size_t value)
 : value_( value)
 { if ( value <= 0) throw invalid_poolsize(); }
 
 poolsize::operator std::size_t () const
 { return value_; }
+
 } }
 
Modified: sandbox/task/libs/task/src/scanns.cpp
==============================================================================
--- sandbox/task/libs/task/src/scanns.cpp	(original)
+++ sandbox/task/libs/task/src/scanns.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,11 +10,12 @@
 
 namespace boost { namespace task
 {
+
 scanns::scanns( std::size_t value)
 : value_( value)
 { if ( value < 0) throw invalid_scanns(); }
 
 scanns::operator std::size_t () const
 { return value_; }
-} }
 
+}}
Modified: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_posix.cpp	(original)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -15,6 +15,7 @@
 
 namespace boost { namespace task
 {
+
 semaphore::semaphore( int value)
 : handle_()
 {
@@ -81,4 +82,5 @@
                 throw system::system_error( errno, system::system_category);
         return value;
 }
+
 }}
Modified: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_windows.cpp	(original)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -16,6 +16,7 @@
 
 namespace boost { namespace task
 {
+
 semaphore::semaphore( int value)
 : handle_()
 {
@@ -77,4 +78,5 @@
         }
         return value;
 }
+
 }}
Modified: sandbox/task/libs/task/src/watermark.cpp
==============================================================================
--- sandbox/task/libs/task/src/watermark.cpp	(original)
+++ sandbox/task/libs/task/src/watermark.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,6 +10,7 @@
 
 namespace boost { namespace task
 {
+
 high_watermark::high_watermark( std::size_t value)
 : value_( value)
 {
@@ -29,4 +30,5 @@
 
 low_watermark::operator std::size_t () const
 { return value_; }
-} }
+
+}}
Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp	(original)
+++ sandbox/task/libs/task/src/worker.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -6,10 +6,10 @@
 
 #include "boost/task/detail/worker.hpp"
 
-namespace boost { namespace task
-{
+namespace boost { namespace task {
 namespace detail
 {
+
 thread_specific_ptr< worker > worker::tss_;
 
 const thread::id
@@ -51,6 +51,7 @@
 void
 worker::run()
 {
+	// FIXME: ugly
         worker::tss_.reset( new worker( * this) );
         impl_->run();
 }
@@ -58,5 +59,5 @@
 worker *
 worker::tss_get()
 { return worker::tss_.get(); }
-} } }
 
+}}}
Modified: sandbox/task/libs/task/src/worker_group.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker_group.cpp	(original)
+++ sandbox/task/libs/task/src/worker_group.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -9,10 +9,10 @@
 #include <boost/foreach.hpp>
 #include <boost/utility.hpp>
 
-namespace boost { namespace task
-{
+namespace boost { namespace task {
 namespace detail
 {
+
 worker_group::worker_group()
 :
 cont_(),
@@ -99,5 +99,5 @@
         BOOST_FOREACH( worker w, cont_)
         { w.signal_shutdown_now(); }
 }
-} } }
 
+}}}
Modified: sandbox/task/libs/task/src/wsq.cpp
==============================================================================
--- sandbox/task/libs/task/src/wsq.cpp	(original)
+++ sandbox/task/libs/task/src/wsq.cpp	2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,10 +10,10 @@
 
 #include <boost/task/detail/atomic.hpp>
 
-namespace boost { namespace task
-{
+namespace boost { namespace task {
 namespace detail
 {
+
 wsq::wsq()
 :
 initial_size_( 32),
@@ -114,6 +114,5 @@
         }
         return false;
 }
-} } }
-
 
+}}}