$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r85855 - in trunk: boost/thread libs/thread/doc libs/thread/example libs/thread/test
From: vicente.botet_at_[hidden]
Date: 2013-09-23 12:45:00
Author: viboes
Date: 2013-09-23 12:45:00 EDT (Mon, 23 Sep 2013)
New Revision: 85855
URL: http://svn.boost.org/trac/boost/changeset/85855
Log:
Thread: added first thread_pool.
Added:
   trunk/boost/thread/thread_pool.hpp   (contents, props changed)
   trunk/libs/thread/example/thread_pool.cpp   (contents, props changed)
Text files modified: 
   trunk/boost/thread/thread_pool.hpp        |   192 ++++++++++++++++++++++++++++++++++++++++
   trunk/libs/thread/doc/thread.qbk          |     2                                         
   trunk/libs/thread/example/thread_pool.cpp |   147 ++++++++++++++++++++++++++++++          
   trunk/libs/thread/test/Jamfile.v2         |     7 +                                       
   4 files changed, 345 insertions(+), 3 deletions(-)
Added: trunk/boost/thread/thread_pool.hpp
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ trunk/boost/thread/thread_pool.hpp	2013-09-23 12:45:00 EDT (Mon, 23 Sep 2013)	(r85855)
@@ -0,0 +1,192 @@
+// Copyright (C) 2013 Vicente J. Botet Escriba
+//
+//  Distributed under the Boost Software License, Version 1.0. (See accompanying
+//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// 2013/09 Vicente J. Botet Escriba
+//    Adapt to boost from CCIA C++11 implementation
+//    first implementation of a simple pool thread using a vector of threads and a sync_queue.
+
+#ifndef BOOST_THREAD_THREAD_POOL_HPP
+#define BOOST_THREAD_THREAD_POOL_HPP
+
+#include <boost/thread/detail/config.hpp>
+#include <boost/thread/detail/delete.hpp>
+#include <boost/thread/detail/move.hpp>
+#include <boost/thread/scoped_thread.hpp>
+#include <boost/thread/sync_queue.hpp>
+#include <boost/thread/detail/function_wrapper.hpp>
+
+#ifdef BOOST_NO_CXX11_HDR_FUNCTIONAL
+#include <boost/function.hpp>
+#else
+#include <functional>
+#endif
+
+#if defined  BOOST_NO_CXX11_RVALUE_REFERENCES
+#include <boost/container/vector.hpp>
+#else
+#include <vector>
+#endif
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+
+  class thread_pool
+  {
+    /// type-erasure to store the works to do
+    typedef  detail::function_wrapper work;
+    /// the kind of stored threads are scoped threads to ensure that the threads are joined.
+    /// A move aware vector type
+    typedef scoped_thread<> thread_t;
+#if defined  BOOST_NO_CXX11_RVALUE_REFERENCES
+    typedef container::vector<thread_t> thread_vector;
+#else
+    typedef std::vector<thread_t> thread_vector;
+#endif
+
+    /// the thread safe work queue
+    sync_queue<work > work_queue;
+    /// A move aware vector
+    thread_vector threads;
+
+    /**
+     * Effects: try to execute one task.
+     * Returns: whether a task has been executed.
+     * Throws: whatever the current task constructor throws or the task() throws.
+     */
+    bool try_executing_one()
+    {
+      work task;
+      try
+      {
+        if (work_queue.try_pull(task))
+        {
+          task();
+          return true;
+        }
+        return false;
+      }
+      catch (std::exception& ex)
+      {
+        return false;
+      }
+      catch (...)
+      {
+        return false;
+      }
+    }
+    /**
+     * Effects: schedule one task or yields
+     * Throws: whatever the current task constructor throws or the task() throws.
+     */
+    void schedule_one_or_yield()
+    {
+        if ( ! try_executing_one())
+        {
+          this_thread::yield();
+        }
+    }
+    /**
+     * The main loop of the worker threads
+     */
+    void worker_thread()
+    {
+      while (!is_closed())
+      {
+        schedule_one_or_yield();
+      }
+    }
+
+  public:
+    /// thread_pool is not copyable.
+    BOOST_THREAD_NO_COPYABLE(thread_pool)
+
+    /**
+     * Effects: creates a thread pool that runs closures on @c thread_count threads.
+     */
+    thread_pool(unsigned const thread_count = thread::hardware_concurrency())
+    {
+      try
+      {
+        for (unsigned i = 0; i < thread_count; ++i)
+        {
+          threads.push_back(thread_t(&thread_pool::worker_thread, this));
+        }
+      }
+      catch (...)
+      {
+        close();
+        throw;
+      }
+    }
+    /**
+     * Effects: Destroys the thread pool.
+     * Synchronization: The completion of all the closures happen before the completion of the thread pool destructor.
+     */
+    ~thread_pool()
+    {
+      // signal to all the worker threads that there will be no more submissions.
+      close();
+      // joins all the threads as the threads were scoped_threads
+    }
+
+    /**
+     * Effects: close the thread_pool for submissions. The worker threads will work until
+     */
+    void close()
+    {
+      work_queue.close();
+    }
+
+    /**
+     * Returns: whether the pool is closed for submissions.
+     */
+    bool is_closed()
+    {
+      return work_queue.closed();
+    }
+
+    /**
+     * Effects: The specified function will be scheduled for execution at some point in the future.
+     * If invoking closure throws an exception the thread pool will call std::terminate, as is the case with threads.
+     * Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+     * Throws: sync_queue_is_closed if the thread pool is closed.
+     *
+     */
+    template <typename Closure>
+    void submit(Closure const& closure)
+    {
+      work w ((closure));
+      work_queue.push(boost::move(w));
+      //work_queue.push(work(closure));
+    }
+    template <typename Closure>
+    void submit(BOOST_THREAD_RV_REF(Closure) closure)
+    {
+      work w =boost::move(closure);
+      work_queue.push(boost::move(w));
+      //work_queue.push(work(boost::move(closure)));
+    }
+
+    /**
+     * This must be called from an scheduled task.
+     * Effects: reschedule functions until pred()
+     */
+    template <typename Pred>
+    void reschedule_until(Pred const& pred)
+    {
+      do {
+        schedule_one_or_yield();
+      } while (! pred());
+    }
+
+  };
+
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
Modified: trunk/libs/thread/doc/thread.qbk
==============================================================================
--- trunk/libs/thread/doc/thread.qbk	Mon Sep 23 12:44:26 2013	(r85854)
+++ trunk/libs/thread/doc/thread.qbk	2013-09-23 12:45:00 EDT (Mon, 23 Sep 2013)	(r85855)
@@ -249,7 +249,7 @@
 
 [section:sds Synchronized Data Structures]
 [include synchronized_value.qbk]
-[/include sync_queues_ref.qbk]
+[include sync_queues_ref.qbk]
 [/include sync_streams.qbk]
 [endsect]
 
Added: trunk/libs/thread/example/thread_pool.cpp
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ trunk/libs/thread/example/thread_pool.cpp	2013-09-23 12:45:00 EDT (Mon, 23 Sep 2013)	(r85855)
@@ -0,0 +1,147 @@
+// Copyright (C) 2012-2013 Vicente Botet
+//
+//  Distributed under the Boost Software License, Version 1.0. (See accompanying
+//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#define BOOST_THREAD_VERSION 4
+#define BOOST_THREAD_USES_LOG
+#define BOOST_THREAD_USES_LOG_THREAD_ID
+
+#include <boost/thread/detail/log.hpp>
+#include <boost/thread/thread_pool.hpp>
+#include <boost/assert.hpp>
+#include <string>
+
+void p1()
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " P1" << BOOST_THREAD_END_LOG;
+}
+
+void p2()
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " P2" << BOOST_THREAD_END_LOG;
+}
+
+void push(boost::container::deque<boost::detail::function_wrapper> &data_, BOOST_THREAD_RV_REF(boost::detail::function_wrapper) closure)
+{
+  try
+  {
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+    boost::detail::function_wrapper  v;
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+    //v = boost::move(closure);
+    //v = boost::forward<boost::detail::function_wrapper>(closure);
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+
+    data_.push_back(boost::move(closure));
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+
+    //data_.push_back(boost::forward<boost::detail::function_wrapper>(closure));
+    BOOST_THREAD_LOG
+      << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+
+  }
+  catch (std::exception& ex)
+  {
+    BOOST_THREAD_LOG
+      << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+  }
+  catch (...)
+  {
+    BOOST_THREAD_LOG
+      << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+  }
+}
+
+template <typename Closure>
+void submit(boost::container::deque<boost::detail::function_wrapper> &data_, BOOST_THREAD_FWD_REF(Closure) closure)
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+  //work w =boost::move(closure);
+  //work_queue.push(boost::move(w));
+  //push(data_, boost::detail::function_wrapper(boost::forward<Closure>(closure)));
+  boost::detail::function_wrapper  v =boost::forward<Closure>(closure);
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+  push(data_, boost::move(v));
+
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+}
+
+int main()
+{
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+#if 0
+  {
+    try
+    {
+      boost::detail::function_wrapper f(&p1);
+
+    boost::container::deque<boost::detail::function_wrapper> data_;
+    data_.push_back(boost::move(f));
+    data_.push_back(boost::detail::function_wrapper(&p1));
+    submit(data_, &p1);
+    }
+    catch (std::exception& ex)
+    {
+      BOOST_THREAD_LOG
+        << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+    }
+    catch (...)
+    {
+      BOOST_THREAD_LOG
+        << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+    }
+
+    typedef boost::container::vector<boost::thread> thread_vector;
+    thread_vector threads;
+
+  }
+#endif
+#if 1
+  {
+    try
+    {
+      boost::thread_pool tp;
+      BOOST_THREAD_LOG
+        << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+      tp.submit(&p1);
+      BOOST_THREAD_LOG
+        << boost::this_thread::get_id()  << " <MAIN" << BOOST_THREAD_END_LOG;
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+      tp.submit(&p1);
+      tp.submit(&p2);
+    }
+    catch (std::exception& ex)
+    {
+      BOOST_THREAD_LOG
+        << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+      return 1;
+    }
+    catch (...)
+    {
+      BOOST_THREAD_LOG
+        << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+      return 2;
+    }
+  }
+#endif
+  BOOST_THREAD_LOG
+    << boost::this_thread::get_id()  << "MAIN>" << BOOST_THREAD_END_LOG;
+  return 0;
+}
Modified: trunk/libs/thread/test/Jamfile.v2
==============================================================================
--- trunk/libs/thread/test/Jamfile.v2	Mon Sep 23 12:44:26 2013	(r85854)
+++ trunk/libs/thread/test/Jamfile.v2	2013-09-23 12:45:00 EDT (Mon, 23 Sep 2013)	(r85855)
@@ -717,6 +717,7 @@
           [ thread-run2 ../example/not_interleaved.cpp : ex_not_interleaved ]
           [ thread-run2 ../example/lambda_future.cpp : ex_lambda_future ]
           [ thread-run2 ../example/not_interleaved2.cpp : ex_not_interleaved2 ]
+          [ thread-run2 ../example/thread_pool.cpp : ex_thread_pool ]
 
     ;
 
@@ -793,9 +794,11 @@
           #[ thread-run test_8600.cpp ]
           #[ thread-run test_8943.cpp ]
           #[ thread-run test_8960.cpp ]
-          [ thread-run test_9079_a.cpp ]
-          [ thread-run test_9079_b.cpp ]
+          #[ thread-run test_9079_a.cpp ]
+          #[ thread-run test_9079_b.cpp ]
           #[ thread-run clang_main.cpp ]
+          
+          
 
     ;