$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r86161 - in trunk: boost/thread libs/thread/example libs/thread/test
From: vicente.botet_at_[hidden]
Date: 2013-10-04 16:05:26
Author: viboes
Date: 2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013)
New Revision: 86161
URL: http://svn.boost.org/trac/boost/changeset/86161
Log:
Thread: Added a polymorphic executor and an executor_adaptor.
Added:
   trunk/boost/thread/executor.hpp   (contents, props changed)
   trunk/libs/thread/example/executor.cpp   (contents, props changed)
Text files modified: 
   trunk/boost/thread/executor.hpp        |   175 ++++++++++++++++++++++++++++++++++++++++
   trunk/boost/thread/thread_pool.hpp     |    50 ++++++++---                             
   trunk/libs/thread/example/executor.cpp |   149 ++++++++++++++++++++++++++++++++++      
   trunk/libs/thread/test/Jamfile.v2      |     4                                         
   4 files changed, 361 insertions(+), 17 deletions(-)
Added: trunk/boost/thread/executor.hpp
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ trunk/boost/thread/executor.hpp	2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013)	(r86161)
@@ -0,0 +1,175 @@
+// Copyright (C) 2013 Vicente J. Botet Escriba
+//
+//  Distributed under the Boost Software License, Version 1.0. (See accompanying
+//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// 2013/09 Vicente J. Botet Escriba
+//    Adapt to boost from CCIA C++11 implementation
+
+#ifndef BOOST_THREAD_EXECUTOR_HPP
+#define BOOST_THREAD_EXECUTOR_HPP
+
+#include <boost/thread/detail/config.hpp>
+
+
+#include <boost/thread/detail/delete.hpp>
+#include <boost/thread/detail/move.hpp>
+#include <boost/thread/detail/function_wrapper.hpp>
+
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+
+  class executor
+  {
+  public:
+    /// type-erasure to store the works to do
+    typedef  detail::function_wrapper work;
+
+    /// executor is not copyable.
+    BOOST_THREAD_NO_COPYABLE(executor)
+    executor() {}
+
+    /**
+     * \b Effects: Destroys the executor.
+     *
+     * \b Synchronization: The completion of all the closures happen before the completion of the executor destructor.
+     */
+    virtual ~executor() {};
+
+    /**
+     * \b Effects: close the \c executor for submissions.
+     * The worker threads will work until there is no more closures to run.
+     */
+    virtual void close() = 0;
+
+    /**
+     * \b Returns: whether the pool is closed for submissions.
+     */
+    virtual bool closed() = 0;
+
+    /**
+     * \b Effects: The specified closure will be scheduled for execution at some point in the future.
+     * If invoked closure throws an exception the executor will call std::terminate, as is the case with threads.
+     *
+     * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+     *
+     * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+     * Whatever exception that can be throw while storing the closure.
+     */
+    virtual void submit(BOOST_THREAD_RV_REF(work) closure) = 0;
+
+    /**
+     * \b Requires: \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible.
+     *
+     * \b Effects: The specified closure will be scheduled for execution at some point in the future.
+     * If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads.
+     *
+     * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+     *
+     * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+     * Whatever exception that can be throw while storing the closure.
+     */
+//#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
+    template <typename Closure>
+    void submit(Closure const& closure)
+    {
+      work w ((closure));
+      submit(boost::move(w));
+    }
+//#endif
+
+    template <typename Closure>
+    void submit(BOOST_THREAD_RV_REF(Closure) closure)
+    {
+      work w = boost::move(closure);
+      submit(boost::move(w));
+    }
+
+    /**
+     * Effects: try to execute one task.
+     * Returns: whether a task has been executed.
+     * Throws: whatever the current task constructor throws or the task() throws.
+     */
+    virtual bool try_executing_one() = 0;
+
+    /**
+     * \b Requires: This must be called from an scheduled task.
+     *
+     * \b Effects: reschedule functions until pred()
+     */
+    template <typename Pred>
+    bool reschedule_until(Pred const& pred)
+    {
+      do {
+        //schedule_one_or_yield();
+        if ( ! try_executing_one())
+        {
+          return false;
+        }
+      } while (! pred());
+      return true;
+    }
+  };
+  template <typename Executor>
+  class executor_adaptor : public executor
+  {
+    Executor ex;
+  public:
+    /// type-erasure to store the works to do
+    typedef  detail::function_wrapper work;
+
+    /// executor is not copyable.
+    BOOST_THREAD_NO_COPYABLE(executor_adaptor)
+
+    /**
+     * executor_adaptor constructor
+     */
+    executor_adaptor() : ex() {}
+
+    /**
+     * executor_adaptor constructor
+     */
+#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES)
+    template <typename ...Args>
+    executor_adaptor(BOOST_THREAD_RV_REF(Args) ... args) : ex(boost::forward<Args>(args)...) {}
+#endif
+    /**
+     * \b Effects: close the \c executor for submissions.
+     * The worker threads will work until there is no more closures to run.
+     */
+    void close() { ex.close(); }
+
+    /**
+     * \b Returns: whether the pool is closed for submissions.
+     */
+    bool closed() { return ex.closed(); }
+
+    /**
+     * \b Effects: The specified closure will be scheduled for execution at some point in the future.
+     * If invoked closure throws an exception the executor will call std::terminate, as is the case with threads.
+     *
+     * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+     *
+     * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+     * Whatever exception that can be throw while storing the closure.
+     */
+    void submit(BOOST_THREAD_RV_REF(work) closure)  { return ex.submit(boost::move(closure)); }
+
+
+    /**
+     * Effects: try to execute one task.
+     * Returns: whether a task has been executed.
+     * Throws: whatever the current task constructor throws or the task() throws.
+     */
+    bool try_executing_one() { return ex.try_executing_one(); }
+
+  };
+
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
Modified: trunk/boost/thread/thread_pool.hpp
==============================================================================
--- trunk/boost/thread/thread_pool.hpp	Fri Oct  4 15:02:11 2013	(r86160)
+++ trunk/boost/thread/thread_pool.hpp	2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013)	(r86161)
@@ -52,6 +52,7 @@
     /// A move aware vector
     thread_vector threads;
 
+  public:
     /**
      * Effects: try to execute one task.
      * Returns: whether a task has been executed.
@@ -78,6 +79,7 @@
         return false;
       }
     }
+  private:
     /**
      * Effects: schedule one task or yields
      * Throws: whatever the current task constructor throws or the task() throws.
@@ -89,15 +91,19 @@
           this_thread::yield();
         }
     }
+
     /**
      * The main loop of the worker threads
      */
     void worker_thread()
     {
-      while (!is_closed())
+      while (!closed())
       {
         schedule_one_or_yield();
       }
+      while (try_executing_one())
+      {
+      }
     }
 
   public:
@@ -105,7 +111,9 @@
     BOOST_THREAD_NO_COPYABLE(thread_pool)
 
     /**
-     * Effects: creates a thread pool that runs closures on @c thread_count threads.
+     * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
+     *
+     * \b Throws: Whatever exception is thrown while initializing the needed resources.
      */
     thread_pool(unsigned const thread_count = thread::hardware_concurrency())
     {
@@ -123,8 +131,9 @@
       }
     }
     /**
-     * Effects: Destroys the thread pool.
-     * Synchronization: The completion of all the closures happen before the completion of the thread pool destructor.
+     * \b Effects: Destroys the thread pool.
+     *
+     * \b Synchronization: The completion of all the closures happen before the completion of the \c thread_pool destructor.
      */
     ~thread_pool()
     {
@@ -134,7 +143,8 @@
     }
 
     /**
-     * Effects: close the thread_pool for submissions. The worker threads will work until
+     * \b Effects: close the \c thread_pool for submissions.
+     * The worker threads will work until there is no more closures to run.
      */
     void close()
     {
@@ -142,19 +152,23 @@
     }
 
     /**
-     * Returns: whether the pool is closed for submissions.
+     * \b Returns: whether the pool is closed for submissions.
      */
-    bool is_closed()
+    bool closed()
     {
       return work_queue.closed();
     }
 
     /**
-     * Effects: The specified function will be scheduled for execution at some point in the future.
-     * If invoking closure throws an exception the thread pool will call std::terminate, as is the case with threads.
-     * Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
-     * Throws: sync_queue_is_closed if the thread pool is closed.
+     * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
+     *
+     * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
+     * If invoked closure throws an exception the \c thread_pool will call \c std::terminate, as is the case with threads.
      *
+     * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
+     *
+     * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+     * Whatever exception that can be throw while storing the closure.
      */
     template <typename Closure>
     void submit(Closure const& closure)
@@ -172,15 +186,21 @@
     }
 
     /**
-     * This must be called from an scheduled task.
-     * Effects: reschedule functions until pred()
+     * \b Requires: This must be called from an scheduled task.
+     *
+     * \b Effects: reschedule functions until pred()
      */
     template <typename Pred>
-    void reschedule_until(Pred const& pred)
+    bool reschedule_until(Pred const& pred)
     {
       do {
-        schedule_one_or_yield();
+        //schedule_one_or_yield();
+        if ( ! try_executing_one())
+        {
+          return false;
+        }
       } while (! pred());
+      return true;
     }
 
   };
Added: trunk/libs/thread/example/executor.cpp
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ trunk/libs/thread/example/executor.cpp	2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013)	(r86161)
@@ -0,0 +1,149 @@
+// Copyright (C) 2012-2013 Vicente Botet
+//
+//  Distributed under the Boost Software License, Version 1.0. (See accompanying
+//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#define BOOST_THREAD_VERSION 4
+#define BOOST_THREAD_USES_LOG
+#define BOOST_THREAD_USES_LOG_THREAD_ID
+
+#include <boost/thread/detail/log.hpp>
+#include <boost/thread/thread_pool.hpp>
+#include <boost/thread/executor.hpp>
+#include <boost/assert.hpp>
+#include <string>
+
+void p1()
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " P1" << BOOST_THREAD_END_LOG;
+}
+
+void p2()
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " P2" << BOOST_THREAD_END_LOG;
+}
+
+void push(boost::container::deque<boost::detail::function_wrapper> &data_, BOOST_THREAD_RV_REF(boost::detail::function_wrapper) closure)
+{
+  try
+  {
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+    boost::detail::function_wrapper  v;
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+    //v = boost::move(closure);
+    //v = boost::forward<boost::detail::function_wrapper>(closure);
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+
+    data_.push_back(boost::move(closure));
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+
+    //data_.push_back(boost::forward<boost::detail::function_wrapper>(closure));
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+
+  }
+  catch (std::exception& ex)
+  {
+    BOOST_THREAD_LOG
+      << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+  }
+  catch (...)
+  {
+    BOOST_THREAD_LOG
+      << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+  }
+}
+
+template <typename Closure>
+void submit(boost::container::deque<boost::detail::function_wrapper> &data_, BOOST_THREAD_FWD_REF(Closure) closure)
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+  //work w =boost::move(closure);
+  //work_queue.push(boost::move(w));
+  //push(data_, boost::detail::function_wrapper(boost::forward<Closure>(closure)));
+  boost::detail::function_wrapper  v =boost::forward<Closure>(closure);
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+  push(data_, boost::move(v));
+
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+}
+
+int main()
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+#if 0
+  {
+    try
+    {
+      boost::detail::function_wrapper f(&p1);
+
+    boost::container::deque<boost::detail::function_wrapper> data_;
+    data_.push_back(boost::move(f));
+    data_.push_back(boost::detail::function_wrapper(&p1));
+    submit(data_, &p1);
+    }
+    catch (std::exception& ex)
+    {
+      BOOST_THREAD_LOG
+        << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+    }
+    catch (...)
+    {
+      BOOST_THREAD_LOG
+        << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+    }
+
+    typedef boost::container::vector<boost::thread> thread_vector;
+    thread_vector threads;
+
+  }
+#endif
+#if 1
+  {
+    try
+    {
+      boost::executor_adaptor<boost::thread_pool> ea;
+      boost::executor &tp=ea;
+      BOOST_THREAD_LOG
+        << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+      tp.submit(&p1);
+      BOOST_THREAD_LOG
+        << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+    }
+    catch (std::exception& ex)
+    {
+      BOOST_THREAD_LOG
+        << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+      return 1;
+    }
+    catch (...)
+    {
+      BOOST_THREAD_LOG
+        << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+      return 2;
+    }
+  }
+#endif
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << "MAIN>" << BOOST_THREAD_END_LOG;
+  return 0;
+}
Modified: trunk/libs/thread/test/Jamfile.v2
==============================================================================
--- trunk/libs/thread/test/Jamfile.v2	Fri Oct  4 15:02:11 2013	(r86160)
+++ trunk/libs/thread/test/Jamfile.v2	2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013)	(r86161)
@@ -719,6 +719,7 @@
           [ thread-run2 ../example/lambda_future.cpp : ex_lambda_future ]
           [ thread-run2 ../example/not_interleaved2.cpp : ex_not_interleaved2 ]
           [ thread-run2 ../example/thread_pool.cpp : ex_thread_pool ]
+          [ thread-run2 ../example/executor.cpp : ex_executor ]
 
     ;
 
@@ -798,8 +799,7 @@
           #[ thread-run test_9079_a.cpp ]
           #[ thread-run test_9079_b.cpp ]
           #[ thread-run clang_main.cpp ]
-          
-          
+          #[ thread-run test_8557.cpp ]
 
     ;