$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r55609 - in sandbox/task: boost boost/task boost/task/detail libs/task/examples libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-08-16 04:41:57
Author: olli
Date: 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
New Revision: 55609
URL: http://svn.boost.org/trac/boost/changeset/55609
Log:
 - fiber support enabled
Text files modified: 
   sandbox/task/boost/task.hpp                      |     1                                         
   sandbox/task/boost/task/detail/fiber_posix.hpp   |    60 +++++++-------                          
   sandbox/task/boost/task/detail/fiber_windows.hpp |    73 ++++++++---------                       
   sandbox/task/boost/task/detail/worker.hpp        |   161 ++++++++++++++++----------------------- 
   sandbox/task/boost/task/task.hpp                 |     4                                         
   sandbox/task/boost/task/utility.hpp              |    15 --                                      
   sandbox/task/libs/task/examples/Jamfile.v2       |    13 ++                                      
   sandbox/task/libs/task/src/semaphore_posix.cpp   |     5                                         
   sandbox/task/libs/task/src/semaphore_windows.cpp |     5                                         
   sandbox/task/libs/task/src/worker.cpp            |     4                                         
   10 files changed, 155 insertions(+), 186 deletions(-)
Modified: sandbox/task/boost/task.hpp
==============================================================================
--- sandbox/task/boost/task.hpp	(original)
+++ sandbox/task/boost/task.hpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -27,6 +27,7 @@
 #include <boost/task/static_pool.hpp>
 #include <boost/task/smart.hpp>
 #include <boost/task/task.hpp>
+#include <boost/task/unbounded_buffer.hpp>
 #include <boost/task/unbounded_channel.hpp>
 #include <boost/task/utility.hpp>
 #include <boost/task/watermark.hpp>
Modified: sandbox/task/boost/task/detail/fiber_posix.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_posix.hpp	(original)
+++ sandbox/task/boost/task/detail/fiber_posix.hpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -18,6 +18,7 @@
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
 #include <boost/shared_array.hpp>
+#include <boost/shared_ptr.hpp>
 #include <boost/system/system_error.hpp>
 
 namespace boost { namespace task {
@@ -54,6 +55,18 @@
         shared_array< char >	stack_;
         st_state				state_;
 
+	fiber(
+		function< void() > fn,
+		std::size_t stack_size)
+	:
+	fn_( fn),
+	stack_size_( stack_size),
+	caller_(),
+	callee_(),
+	stack_( new char[stack_size]),
+	state_( st_uninitialized)
+	{ BOOST_ASSERT( stack_size_ > 0); }
+
         bool uninitialized_() const
         { return state_ == st_uninitialized; }
 
@@ -66,7 +79,7 @@
         bool exited_() const
         { return state_ == st_exited; }
 
-	void yield_()
+	void exit_()
         {
                 if ( ::swapcontext( & callee_, & caller_) == -1)
                         throw system::system_error(
@@ -75,11 +88,10 @@
                                         system::system_category) );
         }
 
-	void yield_to_( fiber & to)
+	void switch_to_( fiber & to)
         {
                 std::swap( caller_, to.caller_);
                 std::swap( state_, to.state_);
-
                 if ( ::swapcontext( & callee_, & to.callee_) == -1)
                         throw system::system_error(
                                 system::error_code(
@@ -128,22 +140,15 @@
         }
 
 public:
-	static void convert_thread_to_fiber() {}
+	typedef shared_ptr< fiber >	sptr_t;
 
-	fiber(
+	static void convert_thread_to_fiber()
+	{}
+
+	static sptr_t create(
                 function< void() > fn,
                 std::size_t stack_size)
-	:
-	fn_( fn),
-	stack_size_( stack_size),
-	caller_(),
-	callee_(),
-	stack_( new char[stack_size]),
-	state_( st_uninitialized)
-	{
-		BOOST_ASSERT( ! fn_.empty() );
-		BOOST_ASSERT( stack_size_ > 0);
-	}
+	{ return sptr_t( new fiber( fn, stack_size) ); }
 
         ~fiber()
         { BOOST_ASSERT( ! running_() ); }
@@ -151,25 +156,18 @@
         bool ready() const
         { return uninitialized_() || ready_(); }
 
-    bool running() const
+    	bool running() const
         { return running_(); }
 
-    bool exited() const
+   	 bool exited() const
         { return exited_(); }
 
-	void yield()
-	{
-		BOOST_ASSERT( running_() );
-		state_ = st_ready;
-		yield_();
-		BOOST_ASSERT( running_() );
-	}
-
-	void yield_to( fiber & to)
+	void switch_to( sptr_t & to)
         {
+		BOOST_ASSERT( to);
                 BOOST_ASSERT( running_() );
-		if ( to.uninitialized_() ) to.init_();
-		yield_to_( to);
+		if ( to->uninitialized_() ) to->init_();
+		switch_to_( * to);
                 BOOST_ASSERT( running_() );
         }
 
@@ -187,11 +185,11 @@
         {
                 BOOST_ASSERT( running_() ) ;
                 state_ = st_exited;
-		yield_();
+		exit_();
                 BOOST_ASSERT(!"should never be reached");
         }
 };
-}}}
+} } }
 
 #endif // BOOST_TASK_DETAIL_FIBER_POSIX_H
 
Modified: sandbox/task/boost/task/detail/fiber_windows.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_windows.hpp	(original)
+++ sandbox/task/boost/task/detail/fiber_windows.hpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -17,9 +17,10 @@
 
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
 #include <boost/system/system_error.hpp>
 
-namespace boost { namespace task {
+namespace boost { namespace fibers {
 namespace detail
 {
 template< typename Fiber >
@@ -47,11 +48,22 @@
         friend
         VOID CALLBACK trampoline( LPVOID);
 
-    function< void() >	fn_;
-	std::size_t			stack_size_;
-	LPVOID				caller_;
-	LPVOID				callee_;
-	st_state			state_;
+    function< void( context< fiber > &) >	fn_;
+	std::size_t								stack_size_;
+	LPVOID									caller_;
+	LPVOID									callee_;
+	st_state								state_;
+
+	fiber(
+		function< void() > fn,
+		std::size_t stack_size)
+	:
+	fn_( fn),
+	stack_size_( stack_size),
+	caller_( 0),
+	callee_( 0),
+	state_( st_uninitialized)
+	{ BOOST_ASSERT( stack_size_ > 0); }
 
         bool uninitialized_() const
         { return state_ == st_uninitialized; }
@@ -65,14 +77,13 @@
         bool exited_() const
         { return state_ == st_exited; }
 
-	void yield_()
+	void exit_()
         { ::SwitchToFiber( caller_); }
 
-	void yield_to_( fiber & to)
+	void switch_to_( fiber & to)
         {
                 std::swap( caller_, to.caller_);
                 std::swap( state_, to.state_);
-
                 ::SwitchToFiber( to.callee_);
         }
 
@@ -103,29 +114,21 @@
         }
 
 public:
+	typedef shared_ptr< fiber >		sptr_t;
+
         static void convert_thread_to_fiber()
         {
-		if ( ! ::ConvertThreadToFiber( 0) )
-			throw system::system_error(
-				system::error_code(
-					::GetLastError(),
-					system::system_category) );
-	
+			if ( ! ::ConvertThreadToFiber( 0)_)
+				throw system::system_error(
+					system::error_code(
+						::GetLastError(),
+						system::system_category) );
         }
 
-	fiber(
+	static sptr-t create(
                 function< void() > fn,
                 std::size_t stack_size)
-	:
-	fn_( fn),
-	stack_size_( stack_size),
-	caller_( 0),
-	callee_( 0),
-	state_( st_uninitialized)
-	{
-		BOOST_ASSERT( ! fn_.empty() );
-		BOOST_ASSERT( stack_size_ > 0);
-	}
+	{ return sptr_t( new fiber( fn, stack_size) ); }
 
         ~fiber()
         {
@@ -142,19 +145,11 @@
     bool exited() const
         { return exited_(); }
 
-	void yield()
-	{
-		BOOST_ASSERT( running_() );
-		state_ = st_ready;
-		yield_();
-		BOOST_ASSERT( running_() );
-	}
-
-	void yield_to( fiber & to)
+	void switch_to( sptr_t & to)
         {
                 BOOST_ASSERT( running_() );
-		if ( to.uninitialized_() ) to.init_();
-		yield_to_( to);
+		if ( to->uninitialized_() ) to->init_();
+		switch_to_( * to);
                 BOOST_ASSERT( running_() );
         }
 
@@ -172,11 +167,11 @@
         {
                 BOOST_ASSERT( running_() || ready_() ) ;
                 state_ = st_exited;
-		yield_();
+		exit_();
                 BOOST_ASSERT(!"should never be reached");
         }
 };
-}}}
+} } }
 
 #endif // BOOST_TASK_DETAIL_FIBER_WINDOWS_H
 
Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp	(original)
+++ sandbox/task/boost/task/detail/worker.hpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -9,6 +9,7 @@
 
 #include <cstddef>
 #include <list>
+#include <set>
 #include <utility>
 
 #include <boost/assert.hpp>
@@ -62,7 +63,7 @@
 
         virtual void reschedule_until( function< bool() > const&) = 0;
 
-	virtual bool block() = 0;
+	virtual void block() = 0;
 };
 
 template<
@@ -92,15 +93,14 @@
                 { return die_(); }
         };
 
-	typedef shared_ptr< fiber >		fiber_t;
         typedef shared_ptr< thread >	thread_t;
 
         Pool					&	pool_;
         thread_t					thrd_;
-	fiber_t						fib_;
+	fiber::sptr_t				fib_;
         wsq							wsq_;
-	std::list< fiber_t >		blocked_fibers_;
-	std::list< fiber_t >		runnable_fiber_lst;
+	std::list< fiber::sptr_t >	blocked_fibers_;
+	std::list< fiber::sptr_t >	runnable_fibers_;
         semaphore					shtdwn_sem_;
         semaphore					shtdwn_now_sem_;
         bool						shtdwn_;
@@ -124,7 +124,16 @@
 
         void try_blocked_fibers_()
         {
-		td::list< fiber_t > 	
+		if ( ! blocked_fibers_.empty() )
+		{
+			fiber::sptr_t this_fib = fib_;
+			runnable_fibers_.push_back( this_fib);
+			fiber::sptr_t blocked_fib = blocked_fibers_.front();
+			blocked_fibers_.pop_front();
+			fib_ = blocked_fib;
+			this_fib->switch_to( blocked_fib);
+			fib_ = this_fib;
+		}
         }
 
         bool take_global_callable_(
@@ -152,49 +161,52 @@
                 return false;
         }
 
-	void run_()
+	void process_( bool all)
         {
                 callable ca;
-		while ( ! shutdown_() )
+		if ( all ? try_take_local_callable_( ca) || 
+					try_steal_other_callable_( ca) ||
+					try_take_global_callable_( ca)
+				 : try_take_local_callable_( ca) )
                 {
-			try_runnable_fibers_();
-			if ( try_take_local_callable_( ca) || 
-				 try_take_global_callable_( ca) ||
-				 try_steal_other_callable_( ca) )
-			{
-				execute_( ca);
-				scns_ = 0;
-			}
-			else
+			execute_( ca);
+			scns_ = 0;
+		}
+		else
+		{
+			guard grd( pool_.idle_worker_);
+			++scns_;
+			if ( scns_ >= max_scns_)
                         {
-				guard grd( pool_.idle_worker_);
-				if ( shutdown_() ) return;
-				++scns_;
-				if ( scns_ >= max_scns_)
+				if ( pool_.size_() > pool_.idle_worker_)
+				{
+					if ( take_global_callable_( ca, asleep_) )
+						execute_( ca);
+				}
+				else if ( blocked_fibers_.empty() )
                                 {
-					// should the comparation be atomic or
-					// at least the read of idle_worker_ be atomic ?
-					if ( pool_.size_() == pool_.idle_worker_)
-					{
-						if ( take_global_callable_( ca, asleep_) )
-							execute_( ca);
-					}
-					else
-						try
-						{ this_thread::sleep( asleep_); }
-						catch ( thread_interrupted const&)
-						{ return; }
-					scns_ = 0;
+					try
+					{ this_thread::sleep( asleep_); }
+					catch ( thread_interrupted const&)
+					{ return; }
                                 }
-				else
-					this_thread::yield();
+				scns_ = 0;
                         }
+			else
+				this_thread::yield();
                 }
+		try_blocked_fibers_();
+	}
+
+	void run_()
+	{
+		while ( ! shutdown_() )
+			process_( true);
         }
 
         bool shutdown_()
         {
-		if ( shutdown__() && get_pool().channel_.empty() )
+		if ( shutdown__() && pool_.channel_.empty() && blocked_fibers_.empty() )
                         return true;
                 else if ( shutdown_now__() )
                         return true;
@@ -225,7 +237,7 @@
         fib_(),
         wsq_(),
         blocked_fibers_(),
-	runnable_fiber_lst(),
+	runnable_fibers_(),
         shtdwn_sem_( 0),
         shtdwn_now_sem_( 0),
         shtdwn_( false),
@@ -261,76 +273,47 @@
         bool try_steal( callable & ca)
         { return wsq_.try_steal( ca); }
 
-	Pool & get_pool() const
-	{ return pool_; }
-
         void run()
         {
                 BOOST_ASSERT( get_id() == this_thread::get_id() );
 
                 fiber::convert_thread_to_fiber();
 
-		fiber_t fib(
-			new fiber(
-				bind(
-					& worker_object::run_,
-					this),
+		fiber::sptr_t fib(
+			fiber::create(
+				bind( & worker_object::run_, this),
                                 stack_size_) );
                 fib_.swap( fib);
                 fib_->run();
-		BOOST_ASSERT( fib_->exited() );
                 fib_.reset();
         }
 
         void reschedule_until( function< bool() > const& pred)
         {
-		callable ca;
-		while ( ! pred() /* && ! shutdown_() */)
-		{
-			if ( try_take_local_callable_( ca) )
-			{
-				execute_( ca);
-				scns_ = 0;
-			}
-			else
-			{
-				guard grd( get_pool().idle_worker_);
-				if ( shutdown_() ) return;
-				++scns_;
-				if ( scns_ >= max_scns_)
-				{
-					this_thread::sleep( asleep_);
-					scns_ = 0;
-				}
-				else
-					this_thread::yield();
-			}
-		}
+		while ( ! pred() )
+			process_( false);
         }
 
-	bool block()
+	void block()
         {
-		blocked_fibers_.push_back( fib_);
-		fiber_t fib;
+		fiber::sptr_t this_fib = fib_;
+		blocked_fibers_.push_back( this_fib);
+		fiber::sptr_t runnable_fib;
                 if ( runnable_fibers_.empty() )
                 {
-			fib.reset(
-				new fiber(
-					bind(
-						& worker_object::run_,
-						this),
-					stack_size_) );
+			runnable_fib = fiber::create(
+					bind( & worker_object::run_, this),
+					stack_size_);
                 }
                 else
                 {
-			fib = runnable_fibers_.front();
+			runnable_fib = runnable_fibers_.front();
                         runnable_fibers_.pop_front();
                 }
-		fib_.swap( fib);
-		fib_->run();
-		runnable_fibers_.push_back( fib);
-		
-		return ! shutdown_();
+		BOOST_ASSERT( runnable_fib);
+		fib_ = runnable_fib;
+		this_fib->switch_to( runnable_fib);
+		fib_ = this_fib;
         }
 };
 
@@ -371,17 +354,9 @@
         void put( callable const&);
         bool try_steal( callable &);
 
-	template< typename Pool >
-	Pool & get_pool() const
-	{
-		worker_object< Pool, worker > * p( dynamic_cast< worker_object< Pool, worker > * >( impl_.get() ) );
-		BOOST_ASSERT( p);
-		return p->get_pool();
-	}
-
         void run();
         void reschedule_until( function< bool() > const&);
-	bool block();
+	void block();
 
         static worker * tss_get();
 };
Modified: sandbox/task/boost/task/task.hpp
==============================================================================
--- sandbox/task/boost/task/task.hpp	(original)
+++ sandbox/task/boost/task/task.hpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -70,6 +70,8 @@
                 { throw task_already_executed(); }
                 catch ( thread_interrupted const&)
                 { this->prom_.set_exception( copy_exception( task_interrupted() ) ); }
+		catch ( task_interrupted const& e)
+		{ this->prom_.set_exception( copy_exception( e) ); }
                 catch ( boost::exception const& e)
                 { this->prom_.set_exception( copy_exception( e) ); }
                 catch ( std::ios_base::failure const& e)
@@ -127,6 +129,8 @@
                 { throw task_already_executed(); }
                 catch ( thread_interrupted const&)
                 { this->prom_.set_exception( copy_exception( task_interrupted() ) ); }
+		catch ( task_interrupted const& e)
+		{ this->prom_.set_exception( copy_exception( e) ); }
                 catch ( boost::exception const& e)
                 { this->prom_.set_exception( copy_exception( e) ); }
                 catch ( std::ios_base::failure const& e)
Modified: sandbox/task/boost/task/utility.hpp
==============================================================================
--- sandbox/task/boost/task/utility.hpp	(original)
+++ sandbox/task/boost/task/utility.hpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -9,7 +9,6 @@
 
 #include <boost/assert.hpp>
 #include <boost/thread.hpp>
-#include <boost/thread/thread_time.hpp>
 
 #include <boost/task/detail/worker.hpp>
 
@@ -17,12 +16,12 @@
 
 namespace boost { namespace this_task
 {
-template< typename Pool >
-Pool & get_pool()
+inline
+void block()
 {
         task::detail::worker * w( task::detail::worker::tss_get() );
         BOOST_ASSERT( w);
-	return w->get_pool< Pool >();
+	w->block();
 }
 
 inline
@@ -30,14 +29,6 @@
 { return task::detail::worker::tss_get() != 0; }
 
 inline
-bool block()
-{
-	task::detail::worker * w( task::detail::worker::tss_get() );
-	BOOST_ASSERT( w);
-	return w->block();
-}
-
-inline
 thread::id worker_id()
 {
         task::detail::worker * w( task::detail::worker::tss_get() );
Modified: sandbox/task/libs/task/examples/Jamfile.v2
==============================================================================
--- sandbox/task/libs/task/examples/Jamfile.v2	(original)
+++ sandbox/task/libs/task/examples/Jamfile.v2	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -25,10 +25,21 @@
     ;
 
 exe bind_to_processors : bind_to_processors.cpp ;
+exe buffer_multi : buffer_multi.cpp ;
+exe buffer_multi2 : buffer_multi2.cpp ;
+exe buffer_pool : buffer_pool.cpp ;
+exe buffer_pool_thread : buffer_pool_thread.cpp ;
+exe buffer_thread : buffer_thread.cpp ;
 exe fork_join : fork_join.cpp ;
 exe interrupt : interrupt.cpp ;
+exe no_deadlock_pool : no_deadlock_pool.cpp ;
+exe no_deadlock_pool2 : no_deadlock_pool2.cpp ;
+exe no_deadlock_pool3 : no_deadlock_pool3.cpp ;
 exe pending : pending.cpp ;
 exe priority : priority.cpp ;
-exe shutdonw_now : shutdown_now.cpp ;
+exe semaphore_thread : semaphore_thread.cpp ;
+exe semaphore_pool : semaphore_pool.cpp ;
+exe semaphore_pool_thread : semaphore_pool_thread.cpp ;
+exe shutdown_now : shutdown_now.cpp ;
 exe smart : smart.cpp ;
 exe submit : submit.cpp ;
Modified: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_posix.cpp	(original)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -41,10 +41,7 @@
         if ( this_task::runs_in_pool() )
         {
                 while ( ! try_wait() )
-		{
-			if ( ! this_task::block() )
-				throw task_interrupted();
-		}
+			this_task::block();
         }
         else
         {
Modified: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_windows.cpp	(original)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -41,10 +41,7 @@
         if ( this_task::runs_in_pool() )
         {
                 while ( ! try_wait() )
-		{
-			if ( ! this_task::block() )
-				throw task_interrupted();
-		}
+			this_task::block();
         }
         else
         {
Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp	(original)
+++ sandbox/task/libs/task/src/worker.cpp	2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -52,9 +52,9 @@
 worker::reschedule_until( function< bool() > const& pred)
 { impl_->reschedule_until( pred); }
 
-bool
+void
 worker::block()
-{ return impl_->block(); }
+{ impl_->block(); }
 
 worker *
 worker::tss_get()