$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r51476 - in sandbox/threadpool/boost/tp: . detail
From: oliver.kowalke_at_[hidden]
Date: 2009-02-27 16:04:09
Author: olli
Date: 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
New Revision: 51476
URL: http://svn.boost.org/trac/boost/changeset/51476
Log:
* rescheduling of current task until condition evaluates to true
* boost::this_task namespace introduced
* boost::this_task::reschedule_until() in order to synchronize with other sources
Added:
   sandbox/threadpool/boost/tp/detail/pool_base.hpp   (contents, props changed)
   sandbox/threadpool/boost/tp/detail/worker_base.hpp   (contents, props changed)
Text files modified: 
   sandbox/threadpool/boost/tp/pool.hpp |   117 +++++++++++++++++++++++++-------------- 
   1 files changed, 74 insertions(+), 43 deletions(-)
Added: sandbox/threadpool/boost/tp/detail/pool_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/pool_base.hpp	2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -0,0 +1,27 @@
+//  Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+//  Software License, Version 1.0. (See accompanying file
+//  LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_POOL_BASE_H
+#define BOOST_TP_DETAIL_POOL_BASE_H
+
+#include <boost/thread/tss.hpp>
+
+#include <boost/tp/detail/worker_base.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct pool_base
+{
+	static thread_specific_ptr< worker_base >		tss_worker_;
+};
+
+thread_specific_ptr< worker_base >
+pool_base::tss_worker_;
+
+} } }
+
+#endif // BOOST_TP_DETAIL_POOL_BASE_H
+
Added: sandbox/threadpool/boost/tp/detail/worker_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker_base.hpp	2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -0,0 +1,67 @@
+//  Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+//  Software License, Version 1.0. (See accompanying file
+//  LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_BASE_H
+#define BOOST_TP_DETAIL_WORKER_BASE_H
+
+#include <cstddef>
+
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct worker_base 
+{
+	virtual ~worker_base() {}
+		
+	virtual const shared_ptr< thread > thrd() const = 0;
+
+	virtual const thread::id get_id() const = 0;
+
+	virtual void join() const = 0;
+
+	virtual void interrupt() const = 0;
+
+	virtual void put(
+		callable const& ca,
+		interrupter const& intr) = 0;
+
+	virtual bool try_take(
+		callable & ca,
+		interrupter & intr) = 0;
+
+	virtual bool try_steal(
+		callable & ca,
+		interrupter & intr) = 0;
+
+	virtual bool empty() const = 0;
+
+	virtual void signal_shutdown() = 0;
+
+	virtual void signal_shutdown_now() = 0;
+
+	virtual bool shutdown() = 0;
+
+	virtual bool shutdown_now() = 0;
+
+	virtual std::size_t scanns() const = 0;
+
+	virtual void increment_scanns() = 0;
+
+	virtual void reset_scanns() = 0;
+	
+	virtual void reschedule_until( function< bool() > const&) = 0;
+};
+
+}}}
+
+#endif // BOOST_TP_DETAIL_WORKER_BASE_H
+
Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp	(original)
+++ sandbox/threadpool/boost/tp/pool.hpp	2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -40,6 +40,8 @@
 #ifdef BOOST_BIND_WORKER_TO_PROCESSORS
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
+#include <boost/tp/detail/pool_base.hpp>
+#include <boost/tp/detail/worker_base.hpp>
 #include <boost/tp/detail/wsq.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/poolsize.hpp>
@@ -61,8 +63,8 @@
 namespace tp
 {
 template< typename Channel >
-class pool
-: private noncopyable
+class pool : private noncopyable,
+             private detail::pool_base
 {
 private:
         typedef Channel						channel;
@@ -75,7 +77,7 @@
                 terminated_state
         };
 
-	class worker
+	class worker : public detail::worker_base
         {
         private:
                 class wimpl : private noncopyable
@@ -83,6 +85,7 @@
                 private:
                         typedef std::pair< detail::callable, detail::interrupter >	item;
 
+			pool							*		pool_ptr_;
                         shared_ptr< thread >					thrd_;
                         detail::wsq< item > 					wsq_;
                         interprocess::interprocess_semaphore	shtdwn_sem_;
@@ -91,8 +94,11 @@
                         std::size_t								scns_;
 
                 public:
-			wimpl( function< void() > const& fn)
+			wimpl(
+				pool * pool_ptr,
+				function< void() > const& fn)
                         :
+			pool_ptr_( pool_ptr),
                         thrd_( new thread( fn) ),
                         wsq_(),
                         shtdwn_sem_( 0),
@@ -176,13 +182,18 @@
         
                         void reset_scanns()
                         { scns_ = 0; }
+
+			void reschedule_until( function< bool() > const& cond)
+			{ pool_ptr_->reschedule_until_( cond); }
                 };
 
                 shared_ptr< wimpl >	impl_;
 
         public:
-		worker( function< void() > const& fn)
-		: impl_( new wimpl( fn) )
+		worker(
+			pool * pool_ptr,
+			function< void() > const& fn)
+		: impl_( new wimpl( pool_ptr, fn) )
                 {}
 
                 const shared_ptr< thread > thrd() const
@@ -235,6 +246,9 @@
 
                 void reset_scanns()
                 { impl_->reset_scanns(); }
+
+		void reschedule_until( function< bool() > const& cond)
+		{ impl_->reschedule_until( cond); }
         };
 
         struct id_idx_tag {};
@@ -279,10 +293,10 @@
                 { return die_(); }
         };
 
+	static thread_specific_ptr< random_idx >	tss_rnd_idx_;
+
         worker_list								worker_;
         shared_mutex							mtx_worker_;
-	thread_specific_ptr< worker >			tss_worker_;
-	thread_specific_ptr< random_idx >		tss_rnd_idx_;
         state									state_;
         shared_mutex							mtx_state_;
         channel		 							channel_;
@@ -293,7 +307,6 @@
         volatile uint32_t						running_worker_;
 
         void execute_(
-		worker & w,
                 detail::callable & ca,
                 detail::interrupter & intr,
                 shared_ptr< thread > const& thrd)
@@ -311,10 +324,10 @@
                 BOOST_ASSERT( ca.empty() );
         }
 
-	void next_callable_( worker & w, detail::callable & ca, detail::interrupter & intr)
+	void next_callable_( detail::worker_base * w, detail::callable & ca, detail::interrupter & intr)
         {
                 rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
-		if ( ! w.try_take( ca, intr) )
+		if ( ! w->try_take( ca, intr) )
                 {
                         if ( ! channel_.try_take( ca, intr) )
                         {
@@ -331,14 +344,14 @@
                                 {
                                         detail::guard grd( idle_worker_);
                                         if ( shutdown_( w) ) return;
-					w.increment_scanns();
-					if ( w.scanns() >= scns_)
+					w->increment_scanns();
+					if ( w->scanns() >= scns_)
                                         {
                                                 if ( size_() == idle_worker_)
                                                         channel_.take( ca, intr, asleep_);
                                                 else
                                                         this_thread::sleep( asleep_);
-						w.reset_scanns();
+						w->reset_scanns();
                                         }
                                         else
                                                 this_thread::yield();
@@ -347,21 +360,20 @@
                 }
         }
 
-	template< typename R >
-	void re_schedule_until_( jss::shared_future< R > const& f)
+	void reschedule_until_( function< bool() > const& cond)
         {
-		worker * w( tss_worker_.get() );
+		detail::worker_base * w( pool_base::tss_worker_.get() );
                 BOOST_ASSERT( w);
                 shared_ptr< thread > thrd( w->thrd() );
                 BOOST_ASSERT( thrd);
                 detail::interrupter intr;
                 detail::callable ca;
-		while ( ! f.is_ready() )
+		while ( ! cond() )
                 {
-			next_callable_( * w, ca, intr);
+			next_callable_( w, ca, intr);
                         if( ! ca.empty() )
                         {
-				execute_( *w, ca, intr, thrd);
+				execute_( ca, intr, thrd);
                                 w->reset_scanns();
                         }
                 }
@@ -377,22 +389,22 @@
 
                 worker w( * i);
                 BOOST_ASSERT( w.get_id() == this_thread::get_id() );
-		tss_worker_.reset( new worker( w) );
+		pool_base::tss_worker_.reset( new worker( w) );
                 shared_ptr< thread > thrd( w.thrd() );
                 BOOST_ASSERT( thrd);
                 detail::callable ca;
                 detail::interrupter intr;
 
-		tss_rnd_idx_.reset( new random_idx( worker_) );
+		pool::tss_rnd_idx_.reset( new random_idx( worker_) );
 
                 detail::guard grd( running_worker_);
 
-		while ( ! shutdown_( w) )
+		while ( ! shutdown_( & w) )
                 {
-			next_callable_( w, ca, intr);
+			next_callable_( & w, ca, intr);
                         if( ! ca.empty() )
                         {
-				execute_( w, ca, intr, thrd);
+				execute_( ca, intr, thrd);
                                 w.reset_scanns();
                         }
                 }
@@ -403,6 +415,7 @@
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
                 worker_.insert(
                         worker(
+				this,
                                 boost::bind(
                                         & pool::entry_,
                                         this) ) );
@@ -419,6 +432,7 @@
         {
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
                 worker w(
+				this,
                                 boost::bind(
                                         & pool::entry_,
                                         this,
@@ -443,11 +457,11 @@
         bool terminateing_() const
         { return state_ == terminateing_state; }
 
-	bool shutdown_( worker & w)
+	bool shutdown_( detail::worker_base * w)
         {
-		if ( w.shutdown() && channel_.empty() )
+		if ( w->shutdown() && channel_.empty() )
                         return true;
-		else if ( w.shutdown_now() )
+		else if ( w->shutdown_now() )
                         return true;
                 return false;
         }
@@ -460,8 +474,6 @@
         :
         worker_(),
         mtx_worker_(),
-	tss_worker_(),
-	tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(),
@@ -489,8 +501,6 @@
         :
         worker_(),
         mtx_worker_(),
-	tss_worker_(),
-	tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(
@@ -518,8 +528,6 @@
         :
         worker_(),
         mtx_worker_(),
-	tss_worker_(),
-	tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(),
@@ -548,8 +556,6 @@
         :
         worker_(),
         mtx_worker_(),
-	tss_worker_(),
-	tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(
@@ -679,14 +685,18 @@
                 detail::interrupter intr;
                 jss::packaged_task< R > tsk( act);
                 jss::shared_future< R > fut( tsk.get_future() );
-		worker * w( tss_worker_.get() );
+		detail::worker_base * w( pool_base::tss_worker_.get() );
                 if ( w)
                 {
+			function< bool() > cond(
+				bind(
+					& jss::shared_future< R >::is_ready,
+					fut) );
                         tsk.set_wait_callback(
                                 bind(
-					( void ( pool< Channel >::*)( jss::shared_future< R > const&) ) & pool< Channel >::re_schedule_until_,
+					( void ( pool< Channel >::*)( function< bool() > const&) ) & pool< Channel >::reschedule_until_,
                                         this,
-					fut) );
+					cond) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( fut, intr);
                 }
@@ -716,14 +726,18 @@
                 detail::interrupter intr;
                 jss::packaged_task< R > tsk( act);
                 jss::shared_future< R > fut( tsk.get_future() );
-		worker * w( tss_worker_.get() );
+		detail::worker_base * w( pool_base::tss_worker_.get() );
                 if ( w)
                 {
+			function< bool() > cond(
+				bind(
+					& jss::shared_future< R >::is_ready,
+					fut) );
                         tsk.set_wait_callback(
                                 bind(
-					( void ( pool< Channel >::*)( jss::shared_future< R > const&) ) & pool< Channel >::re_schedule_until_,
+					( void ( pool< Channel >::*)( function< bool() > const&) ) & pool< Channel >::reschedule_until_,
                                         this,
-					fut) );
+					cond) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( fut, intr);
                 }
@@ -742,7 +756,24 @@
         }
 };
 
-} }
+template< typename Channel >
+thread_specific_ptr< typename pool< Channel >::random_idx >
+pool< Channel >::tss_rnd_idx_;
+
+}
+
+namespace this_task
+{
+	inline
+	void reschedule_until( function< bool() > const& cond)
+	{
+		tp::detail::worker_base * w( tp::detail::pool_base::tss_worker_.get() );
+		BOOST_ASSERT ( w);
+		w->reschedule_until( cond);
+	}
+}
+
+}
 
 #endif // BOOST_TP_POOL_H