$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r51614 - in sandbox/threadpool: boost/tp boost/tp/detail libs/tp/build libs/tp/examples libs/tp/src
From: oliver.kowalke_at_[hidden]
Date: 2009-03-04 17:52:44
Author: olli
Date: 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
New Revision: 51614
URL: http://svn.boost.org/trac/boost/changeset/51614
Log:
* worker moved from pool.hpp into worker.hpp
* dependencyy between pool and worker reduced
* this_task::reschedule_until() expects an callable object
* this_task::is_valid() returns if current thread is a valid worker thread
Added:
   sandbox/threadpool/boost/tp/detail/worker.hpp   (contents, props changed)
   sandbox/threadpool/libs/tp/src/worker.cpp   (contents, props changed)
Text files modified: 
   sandbox/threadpool/boost/tp/pool.hpp                     |   345 ++++++++------------------------------- 
   sandbox/threadpool/libs/tp/build/Jamfile.v2              |     2                                         
   sandbox/threadpool/libs/tp/examples/reschedule_until.cpp |    31 ++                                      
   3 files changed, 96 insertions(+), 282 deletions(-)
Added: sandbox/threadpool/boost/tp/detail/worker.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker.hpp	2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -0,0 +1,156 @@
+//  Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+//  Software License, Version 1.0. (See accompanying file
+//  LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_H
+#define BOOST_TP_DETAIL_WORKER_H
+
+#include <cstddef>
+#include <utility>
+
+#include <boost/assert.hpp>
+#include <boost/function.hpp>
+#include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/wsq.hpp>
+
+namespace boost {
+namespace tp {
+namespace detail
+{
+class worker
+{
+private:
+	static thread_specific_ptr< worker >	tss_;
+
+	class impl : private noncopyable
+	{
+	private:
+		typedef std::pair< callable, interrupter >	item;
+		shared_ptr< thread >						thrd_;
+		wsq< item > 								wsq_;
+		interprocess::interprocess_semaphore		shtdwn_sem_;
+		interprocess::interprocess_semaphore		shtdwn_now_sem_;
+		bool										shtdwn_;
+		std::size_t									scns_;
+
+	public:
+		impl( function< void() > const&);
+
+		virtual ~impl() {}
+
+		const shared_ptr< thread > thrd() const;
+
+		const thread::id get_id() const;
+
+		void join() const;
+
+		void interrupt() const;
+
+		void put( callable const&, interrupter const&);
+
+		bool try_take( callable &, interrupter &);
+
+		bool try_steal( callable &, interrupter &);
+
+		bool empty();
+
+		void signal_shutdown();
+
+		void signal_shutdown_now();
+
+		bool shutdown();
+
+		bool shutdown_now();
+
+		std::size_t scanns() const;
+
+		void increment_scanns();
+
+		void reset_scanns();
+
+		virtual void reschedule_until( function< bool() > const&) = 0;
+	};
+
+	template< typename Pool >
+	class impl_pool : public impl
+	{
+	private:
+		Pool	&	p_;
+
+	public:
+		impl_pool( Pool & p, function< void() > const& fn)
+		: impl( fn), p_( p)
+		{}
+
+		void reschedule_until( function< bool() > const& pred)
+		{ p_.reschedule_until_( pred); }
+
+		Pool & get_pool() const
+		{ return p_; }
+	};
+
+	shared_ptr< impl >	impl_;
+
+public:
+	template< typename Pool >
+	worker(
+		Pool & pool,
+		function< void() > const& fn)
+	: impl_( new impl_pool< Pool >( pool, fn) )
+	{}
+
+	const shared_ptr< thread > thrd() const;
+
+	const thread::id get_id() const;
+
+	void join() const;
+
+	void interrupt() const;
+
+	void put( callable const&, interrupter const&);
+
+	bool try_take( callable &, interrupter &);
+
+	bool try_steal( callable &, interrupter &);
+
+	bool empty() const;
+
+	void signal_shutdown();
+
+	void signal_shutdown_now();
+
+	bool shutdown();
+
+	bool shutdown_now();
+
+	std::size_t scanns() const;
+
+	void increment_scanns();
+
+	void reset_scanns();
+
+	void reschedule_until( function< bool() > const&);
+
+	template< typename Pool >
+	Pool & get_thread_pool() const
+	{
+		impl_pool< Pool > * p( dynamic_cast< impl_pool< Pool > * >( impl_.get() ) );
+		BOOST_ASSERT( p);
+		return p->get_pool();
+	}
+
+	static worker * tss_get();
+
+	static void tss_reset( worker * w);
+};
+
+} } }
+
+#endif // BOOST_TP_DETAIL_WORKER_H
+
Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp	(original)
+++ sandbox/threadpool/boost/tp/pool.hpp	2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -16,7 +16,6 @@
 #include <boost/bind.hpp>
 #include <boost/foreach.hpp>
 #include <boost/function.hpp>
-#include <boost/interprocess/sync/interprocess_semaphore.hpp>
 #include <boost/multi_index_container.hpp>
 #include <boost/multi_index/mem_fun.hpp>
 #include <boost/multi_index/ordered_index.hpp>
@@ -24,13 +23,8 @@
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/random.hpp>
 #include <boost/ref.hpp>
-#include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
-#include <boost/thread/condition.hpp>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/shared_mutex.hpp>
-#include <boost/thread/tss.hpp>
 #include <boost/utility.hpp>
 #include <boost/utility/result_of.hpp>
 
@@ -40,7 +34,7 @@
 #ifdef BOOST_BIND_WORKER_TO_PROCESSORS
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
-#include <boost/tp/detail/wsq.hpp>
+#include <boost/tp/detail/worker.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/poolsize.hpp>
 #include <boost/tp/scanns.hpp>
@@ -48,38 +42,27 @@
 #include <boost/tp/watermark.hpp>
 
 namespace boost {
-
 namespace this_task
 {
-template<
-	typename Pool,
-	typename R
->
-void reschedule_until( unique_future< R > const& f)
-{
-	typename Pool::worker * w( Pool::tss_worker_.get() );
-	BOOST_ASSERT( w);
-	w->reschedule_until( f);
-}
-
-template<
-	typename Pool,
-	typename R
->
-void reschedule_until( shared_future< R > const& f)
+template< typename Pred >
+void reschedule_until( Pred const& pred)
 {
-	typename Pool::worker * w( Pool::tss_worker_.get() );
+	tp::detail::worker * w( tp::detail::worker::tss_get() );
         BOOST_ASSERT( w);
-	w->reschedule_until( f);
+	w->reschedule_until( pred);
 }
 
 template< typename Pool >
 Pool & get_thread_pool()
 {
-	typename Pool::worker * w( Pool::tss_worker_.get() );
+	tp::detail::worker * w( tp::detail::worker::tss_get() );
         BOOST_ASSERT( w);
-	return w->get_thread_pool();
+	return w->get_thread_pool< Pool >();
 }
+
+inline
+bool is_valid()
+{ return tp::detail::worker::tss_get() != 0; }
 }
 
 namespace tp
@@ -88,21 +71,14 @@
 class pool : private noncopyable
 {
 private:
-	template<
-		typename Pool,
-		typename R
-	>
-	friend void this_task::reschedule_until( unique_future< R > const&);
-	
-	template<
-		typename Pool,
-		typename R
-	>
-	friend void this_task::reschedule_until( shared_future< R > const&);
+	template< typename Pred >
+	friend void this_task::reschedule_until( Pred const&);
 
         template< typename Pool >
         friend Pool & this_task::get_thread_pool();
 
+	friend class detail::worker;
+
         typedef Channel						channel;
         typedef typename channel::item		channel_item;
 
@@ -113,200 +89,18 @@
                 terminated_state
         };
 
-	class worker
-	{
-	private:
-		class wimpl : private noncopyable
-		{
-		private:
-			typedef std::pair< detail::callable, detail::interrupter >	item;
-
-			pool							*		pool_ptr_;
-			shared_ptr< thread >					thrd_;
-			detail::wsq< item > 					wsq_;
-			interprocess::interprocess_semaphore	shtdwn_sem_;
-			interprocess::interprocess_semaphore	shtdwn_now_sem_;
-			bool									shtdwn_;
-			std::size_t								scns_;
-
-		public:
-			wimpl(
-				pool * pool_ptr,
-				function< void() > const& fn)
-			:
-			pool_ptr_( pool_ptr),
-			thrd_( new thread( fn) ),
-			wsq_(),
-			shtdwn_sem_( 0),
-			shtdwn_now_sem_( 0),
-			shtdwn_( false),
-			scns_( 0)
-			{ BOOST_ASSERT( ! fn.empty() ); }
-
-			const shared_ptr< thread > thrd() const
-			{ return thrd_; }
-
-			const thread::id get_id() const
-			{ return thrd_->get_id(); }
-
-			void join() const
-			{ thrd_->join(); }
-
-			void interrupt() const
-			{ thrd_->interrupt(); }
-
-			void put(
-				detail::callable const& ca,
-				detail::interrupter const& intr)
-			{
-				BOOST_ASSERT( ! ca.empty() );
-				wsq_.put( std::make_pair( ca, intr) );
-			}
-
-			bool try_take(
-				detail::callable & ca,
-				detail::interrupter & intr)
-			{
-				item itm;
-				bool result( wsq_.try_take( itm) );
-				if ( result)
-				{
-					ca = itm.first;
-					intr = itm.second;
-				}
-				return result;
-			}
-
-			bool try_steal(
-				detail::callable & ca,
-				detail::interrupter & intr)
-			{
-				item itm;
-				bool result( wsq_.try_steal( itm) );
-				if ( result)
-				{
-					ca = itm.first;
-					intr = itm.second;
-				}
-				return result;
-			}
-
-			bool empty()
-			{ return wsq_.empty(); }
-
-			void signal_shutdown()
-			{ shtdwn_sem_.post(); }
-
-			void signal_shutdown_now()
-			{ shtdwn_now_sem_.post(); }
-
-			bool shutdown()
-			{
-				if ( ! shtdwn_)
-					shtdwn_ = shtdwn_sem_.try_wait();
-				return shtdwn_;
-			}
-
-			bool shutdown_now()
-			{ return shtdwn_now_sem_.try_wait(); }
-
-			std::size_t scanns() const
-			{ return scns_; }
-
-			void increment_scanns()
-			{ ++scns_; }
-
-			void reset_scanns()
-			{ scns_ = 0; }
-
-			template< typename F >
-			void reschedule_until( F const& f)
-			{ pool_ptr_->reschedule_until_( f); }
-
-			pool & get_thread_pool()
-			{ return * pool_ptr_; }
-		};
-
-		shared_ptr< wimpl >	impl_;
-
-	public:
-		worker(
-			pool * pool_ptr,
-			function< void() > const& fn)
-		: impl_( new wimpl( pool_ptr, fn) )
-		{}
-
-		const shared_ptr< thread > thrd() const
-		{ return impl_->thrd(); }
-
-		const thread::id get_id() const
-		{ return impl_->get_id(); }
-
-		void join() const
-		{ impl_->join(); }
-
-		void interrupt() const
-		{ impl_->interrupt(); }
-
-		void put(
-			detail::callable const& ca,
-			detail::interrupter const& intr)
-		{ impl_->put( ca, intr); }
-
-		bool try_take(
-			detail::callable & ca,
-			detail::interrupter & intr)
-		{ return impl_->try_take( ca, intr); }
-
-		bool try_steal(
-			detail::callable & ca,
-			detail::interrupter & intr)
-		{ return impl_->try_steal( ca, intr); }
-
-		bool empty() const
-		{ return impl_->empty(); }
-
-		void signal_shutdown()
-		{ impl_->signal_shutdown(); }
-
-		void signal_shutdown_now()
-		{ impl_->signal_shutdown_now(); }
-
-		bool shutdown()
-		{ return impl_->shutdown(); }
-
-		bool shutdown_now()
-		{ return impl_->shutdown_now(); }
-
-		std::size_t scanns() const
-		{ return impl_->scanns(); }
-
-		void increment_scanns()
-		{ impl_->increment_scanns(); }
-
-		void reset_scanns()
-		{ impl_->reset_scanns(); }
-
-		template< typename F >
-		void reschedule_until( F const& f)
-		{ return impl_->reschedule_until( f); }
-
-		pool & get_thread_pool()
-		{ return impl_->get_thread_pool(); }
-	};
-
         struct id_idx_tag {};
         struct rnd_idx_tag {};
 
         typedef multi_index::multi_index_container<
-		worker,
+		detail::worker,
                 multi_index::indexed_by<
                         multi_index::ordered_unique<
                                 multi_index::tag< id_idx_tag >,
                                 multi_index::const_mem_fun<
-					worker,
+					detail::worker,
                                         const thread::id,
-					& worker::get_id
+					& detail::worker::get_id
 				>
 			>,
                         multi_index::random_access< multi_index::tag< rnd_idx_tag > >
@@ -326,10 +120,10 @@
                 variate_generator< rand48 &, uniform_int<> > die_;
 
         public:
-		random_idx( worker_list & lst)
+		random_idx( std::size_t size)
                 :
                 rng_(),
-		six_( 0, lst.size() - 1),
+		six_( 0, size - 1),
                 die_( rng_, six_)
                 {}
 
@@ -337,19 +131,18 @@
                 { return die_(); }
         };
 
-	static thread_specific_ptr< worker >		tss_worker_;
         static thread_specific_ptr< random_idx >	tss_rnd_idx_;
 
-	worker_list								worker_;
-	shared_mutex							mtx_worker_;
-	state									state_;
-	shared_mutex							mtx_state_;
-	channel		 							channel_;
-	posix_time::time_duration				asleep_;
-	scanns									scns_;
-	volatile uint32_t						active_worker_;
-	volatile uint32_t						idle_worker_;
-	volatile uint32_t						running_worker_;
+	worker_list									worker_;
+	shared_mutex								mtx_worker_;
+	state										state_;
+	shared_mutex								mtx_state_;
+	channel		 								channel_;
+	posix_time::time_duration					asleep_;
+	scanns										scns_;
+	volatile uint32_t							active_worker_;
+	volatile uint32_t							idle_worker_;
+	volatile uint32_t							running_worker_;
 
         void execute_(
                 detail::callable & ca,
@@ -369,7 +162,7 @@
                 BOOST_ASSERT( ca.empty() );
         }
 
-	void next_callable_( worker & w, detail::callable & ca, detail::interrupter & intr)
+	void next_callable_( detail::worker & w, detail::callable & ca, detail::interrupter & intr)
         {
                 rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
                 if ( ! w.try_take( ca, intr) )
@@ -379,7 +172,7 @@
                                 std::size_t idx( ( * tss_rnd_idx_)() );
                                 for ( std::size_t j( 0); j < worker_.size(); ++j)
                                 {
-					worker other( ridx[idx]);
+					detail::worker other( ridx[idx]);
                                         if ( this_thread::get_id() == other.get_id() ) continue;
                                         if ( ++idx >= worker_.size() ) idx = 0;
                                         if ( other.try_steal( ca, intr) ) break;
@@ -405,16 +198,16 @@
                 }
         }
 
-	template< typename F >
-	void reschedule_until_( F const& f)
+	void reschedule_until_( function< bool() > const& pred)
         {
-		worker * w( tss_worker_.get() );
+		detail::worker * w( detail::worker::tss_get() );
                 BOOST_ASSERT( w);
+		BOOST_ASSERT( w->get_id() == this_thread::get_id() );
                 shared_ptr< thread > thrd( w->thrd() );
                 BOOST_ASSERT( thrd);
                 detail::interrupter intr;
                 detail::callable ca;
-		while ( ! f.is_ready() )
+		while ( ! pred() )
                 {
                         next_callable_( * w, ca, intr);
                         if( ! ca.empty() )
@@ -432,26 +225,27 @@
                 typename id_idx::iterator i( iidx.find( this_thread::get_id() ) );
                 lk.unlock();
                 BOOST_ASSERT( i != iidx.end() );
-
-		worker w( * i);
-		BOOST_ASSERT( w.get_id() == this_thread::get_id() );
-		tss_worker_.reset( new worker( w) );
-		shared_ptr< thread > thrd( w.thrd() );
+		detail::worker::tss_reset( new detail::worker( * i) );
+		
+		detail::worker * w( detail::worker::tss_get() );
+		BOOST_ASSERT( w);	
+		BOOST_ASSERT( w->get_id() == this_thread::get_id() );
+		shared_ptr< thread > thrd( w->thrd() );
                 BOOST_ASSERT( thrd);
                 detail::callable ca;
                 detail::interrupter intr;
 
-		pool::tss_rnd_idx_.reset( new random_idx( worker_) );
+		tss_rnd_idx_.reset( new random_idx( worker_.size() ) );
 
                 detail::guard grd( running_worker_);
 
-		while ( ! shutdown_( w) )
+		while ( ! shutdown_( * w) )
                 {
-			next_callable_( w, ca, intr);
+			next_callable_( * w, ca, intr);
                         if( ! ca.empty() )
                         {
                                 execute_( ca, intr, thrd);
-				w.reset_scanns();
+				w->reset_scanns();
                         }
                 }
         }
@@ -460,8 +254,8 @@
         {
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
                 worker_.insert(
-			worker(
-				this,
+			detail::worker(
+				* this,
                                 boost::bind(
                                         & pool::entry_,
                                         this) ) );
@@ -477,14 +271,13 @@
         void create_worker_( std::size_t n)
         {
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
-		worker w(
-				this,
+		worker_.insert(
+			detail::worker(
+				* this,
                                 boost::bind(
                                         & pool::entry_,
                                         this,
-					n) );
-		worker_.insert(
-			w );
+					n) ) );
         }
 #endif
 
@@ -503,7 +296,7 @@
         bool terminateing_() const
         { return state_ == terminateing_state; }
 
-	bool shutdown_( worker & w)
+	bool shutdown_( detail::worker & w)
         {
                 if ( w.shutdown() && channel_.empty() )
                         return true;
@@ -649,9 +442,9 @@
 
                 channel_.deactivate();
                 shared_lock< shared_mutex > lk2( mtx_worker_);
-		BOOST_FOREACH( worker w, worker_)
+		BOOST_FOREACH( detail::worker w, worker_)
                 { w.signal_shutdown(); }
-		BOOST_FOREACH( worker w, worker_)
+		BOOST_FOREACH( detail::worker w, worker_)
                 { w.join(); }
                 lk2.unlock();
 
@@ -668,12 +461,12 @@
 
                 channel_.deactivate_now();
                 shared_lock< shared_mutex > lk2( mtx_worker_);
-		BOOST_FOREACH( worker w, worker_)
+		BOOST_FOREACH( detail::worker w, worker_)
                 {
                         w.signal_shutdown_now();
                         w.interrupt();
                 }
-		BOOST_FOREACH( worker w, worker_)
+		BOOST_FOREACH( detail::worker w, worker_)
                 { w.join(); }
                 lk2.unlock();
                 std::vector< detail::callable > drain( channel_.drain() );
@@ -731,14 +524,18 @@
                 detail::interrupter intr;
                 packaged_task< R > tsk( act);
                 shared_future< R > f( tsk.get_future() );
-		worker * w( tss_worker_.get() );
+		detail::worker * w( detail::worker::tss_get() );
                 if ( w)
                 {
+			function< bool() > wcb(
+				bind(
+					& shared_future< R >::is_ready,
+					f) );
                         tsk.set_wait_callback(
                                 bind(
-					( void ( pool::*)( shared_future< R > const&) ) & pool::reschedule_until_,
+					( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
                                         this,
-					f) );
+					wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
                 }
@@ -768,14 +565,18 @@
                 detail::interrupter intr;
                 packaged_task< R > tsk( act);
                 shared_future< R > f( tsk.get_future() );
-		worker * w( tss_worker_.get() );
+		detail::worker * w( detail::worker::tss_get() );
                 if ( w)
                 {
+			function< bool() > wcb(
+				bind(
+					& shared_future< R >::is_ready,
+					f) );
                         tsk.set_wait_callback(
                                 bind(
-					( void ( pool::*)( shared_future< R > const&) ) & pool::reschedule_until_,
+					( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
                                         this,
-					f) );
+					wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
                 }
@@ -795,10 +596,6 @@
 };
 
 template< typename Channel >
-thread_specific_ptr< typename pool< Channel >::worker >
-pool< Channel >::tss_worker_;
-
-template< typename Channel >
 thread_specific_ptr< typename pool< Channel >::random_idx >
 pool< Channel >::tss_rnd_idx_;
 
Modified: sandbox/threadpool/libs/tp/build/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/build/Jamfile.v2	(original)
+++ sandbox/threadpool/libs/tp/build/Jamfile.v2	2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -14,7 +14,7 @@
       <link>static:<define>BOOST_THREADPOOL_STATIC_LINK=1
     ;
 
-SOURCES = callable  default_pool  guard  interrupter  poolsize  scanns  watermark ;
+SOURCES = callable  default_pool  guard  interrupter  poolsize  scanns  watermark worker ;
 
 lib boost_threadpool
    : $(SOURCES).cpp
Modified: sandbox/threadpool/libs/tp/examples/reschedule_until.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/reschedule_until.cpp	(original)
+++ sandbox/threadpool/libs/tp/examples/reschedule_until.cpp	2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -21,9 +21,23 @@
 class fibo
 {
 private:
-	jss::shared_future< void >		f_;
+	boost::shared_future< void >		f_;
         int								offset_;
 
+	class holder
+	{
+	private:
+		boost::shared_future< void >		f_;
+
+	public:
+		holder( boost::shared_future< void > const& f)
+		: f_( f)
+		{}
+
+		bool operator()()
+		{ return f_.is_ready(); }
+	};
+
         int seq_( int n)
         {
                 if ( n <= 1) return n;
@@ -36,7 +50,10 @@
                 else
                 {
                         if ( n == 7)
-				boost::this_task::reschedule_until< pool_type >( f_);
+			{
+				holder hldr( f_);
+				boost::this_task::reschedule_until( hldr);
+			}
 
                         boost::function< int() > fn1 = boost::bind(
                                                 & fibo::par_,
@@ -53,13 +70,13 @@
                                 boost::this_task::get_thread_pool< pool_type >().submit(
                                         fn2) );
 
-			return t1.get() + t2.get();
+			return t1.result().get() + t2.result().get();
                 }
         }
 
 public:
         fibo(
-		jss::shared_future< void > f,
+		boost::shared_future< void > f,
                 int offset)
         :  f_( f), offset_( offset)
         {}
@@ -78,8 +95,8 @@
         try
         {
                 pool_type pool( tp::poolsize( 1) );
-		jss::packaged_task< void > tsk( boost::bind( f) );
-		jss::shared_future< void > f( tsk.get_future() );
+		boost::packaged_task< void > tsk( boost::bind( f) );
+		boost::shared_future< void > f( tsk.get_future() );
                 fibo fib( f, 3);
                 std::vector< tp::task< int > > results;
                 results.reserve( 40);
@@ -103,7 +120,7 @@
                         std::vector< tp::task< int > >::iterator i( results.begin() );
                         i != e;
                         ++i)
-			std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;
+			std::cout << "fibonacci " << k++ << " == " << i->result().get() << std::endl;
 
                 pt::ptime stop( pt::microsec_clock::universal_time() );
                 std::cout << ( stop - start).total_milliseconds() << " milli seconds" << std::endl;
Added: sandbox/threadpool/libs/tp/src/worker.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/worker.cpp	2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -0,0 +1,189 @@
+#include "boost/tp/detail/worker.hpp"
+
+namespace boost {
+namespace tp {
+namespace detail
+{
+thread_specific_ptr< worker > worker::tss_;
+
+worker::impl::impl( function< void() > const& fn)
+:
+thrd_( new thread( fn) ),
+wsq_(),
+shtdwn_sem_( 0),
+shtdwn_now_sem_( 0),
+shtdwn_( false),
+scns_( 0)
+{ BOOST_ASSERT( ! fn.empty() ); }
+
+const shared_ptr< thread >
+worker::impl::thrd() const
+{ return thrd_; }
+
+const thread::id
+worker::impl::get_id() const
+{ return thrd_->get_id(); }
+
+void
+worker::impl::join() const
+{ thrd_->join(); }
+
+void
+worker::impl::interrupt() const
+{ thrd_->interrupt(); }
+
+void
+worker::impl::put(
+	callable const& ca,
+	interrupter const& intr)
+{
+	BOOST_ASSERT( ! ca.empty() );
+	wsq_.put( std::make_pair( ca, intr) );
+}
+
+bool
+worker::impl::try_take(
+	callable & ca,
+	interrupter & intr)
+{
+	item itm;
+	bool result( wsq_.try_take( itm) );
+	if ( result)
+	{
+		ca = itm.first;
+		intr = itm.second;
+	}
+	return result;
+}
+
+bool
+worker::impl::try_steal(
+	callable & ca,
+	interrupter & intr)
+{
+	item itm;
+	bool result( wsq_.try_steal( itm) );
+	if ( result)
+	{
+		ca = itm.first;
+		intr = itm.second;
+	}
+	return result;
+}
+
+bool
+worker::impl::empty()
+{ return wsq_.empty(); }
+
+void
+worker::impl::signal_shutdown()
+{ shtdwn_sem_.post(); }
+
+void
+worker::impl::signal_shutdown_now()
+{ shtdwn_now_sem_.post(); }
+
+bool
+worker::impl::shutdown()
+{
+	if ( ! shtdwn_)
+		shtdwn_ = shtdwn_sem_.try_wait();
+	return shtdwn_;
+}
+
+bool
+worker::impl::shutdown_now()
+{ return shtdwn_now_sem_.try_wait(); }
+
+std::size_t
+worker::impl::scanns() const
+{ return scns_; }
+
+void
+worker::impl::increment_scanns()
+{ ++scns_; }
+
+void
+worker::impl::reset_scanns()
+{ scns_ = 0; }
+
+const shared_ptr< thread >
+worker::thrd() const
+{ return impl_->thrd(); }
+
+const thread::id
+worker::get_id() const
+{ return impl_->get_id(); }
+
+void
+worker::join() const
+{ impl_->join(); }
+
+void
+worker::interrupt() const
+{ impl_->interrupt(); }
+
+void
+worker::put(
+	callable const& ca,
+	interrupter const& intr)
+{ impl_->put( ca, intr); }
+
+bool
+worker::try_take(
+	callable & ca,
+	interrupter & intr)
+{ return impl_->try_take( ca, intr); }
+
+bool
+worker::try_steal(
+	callable & ca,
+	interrupter & intr)
+{ return impl_->try_steal( ca, intr); }
+
+bool
+worker::empty() const
+{ return impl_->empty(); }
+
+void
+worker::signal_shutdown()
+{ impl_->signal_shutdown(); }
+
+void
+worker::signal_shutdown_now()
+{ impl_->signal_shutdown_now(); }
+
+bool
+worker::shutdown()
+{ return impl_->shutdown(); }
+
+bool
+worker::shutdown_now()
+{ return impl_->shutdown_now(); }
+
+std::size_t
+worker::scanns() const
+{ return impl_->scanns(); }
+
+void
+worker::increment_scanns()
+{ impl_->increment_scanns(); }
+
+void
+worker::reset_scanns()
+{ impl_->reset_scanns(); }
+
+void
+worker::reschedule_until( function< bool() > const& pred)
+{ return impl_->reschedule_until( pred); }
+
+worker *
+worker::tss_get()
+{ return worker::tss_.get(); }
+
+void
+worker::tss_reset( worker * w)
+{ worker::tss_.reset( w); }
+
+} } }
+