$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r48636 - in sandbox/SOC/2007/signals: boost/dataflow/signals/component boost/dataflow/utility libs/dataflow/build/xcodeide/dataflow.xcodeproj libs/dataflow/example/threadpool
From: stipe_at_[hidden]
Date: 2008-09-06 16:05:08
Author: srajko
Date: 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
New Revision: 48636
URL: http://svn.boost.org/trac/boost/changeset/48636
Log:
new threadpool example
Added:
   sandbox/SOC/2007/signals/boost/dataflow/utility/bind_functor.hpp   (contents, props changed)
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp   (contents, props changed)
Text files modified: 
   sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp                |     4 ++--                                    
   sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj |     4 ++++                                    
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile                        |     1 +                                       
   3 files changed, 7 insertions(+), 2 deletions(-)
Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp	(original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp	2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -11,7 +11,7 @@
 #include <boost/dataflow/support/port/port_adapter.hpp>
 
 #include <boost/signal.hpp>
-
+#include <boost/mpl/vector.hpp>
 
 namespace boost {  namespace signals {
 
@@ -136,7 +136,7 @@
     };
 };
 
-template<typename Filter, typename Signals, typename InSignatures>
+template<typename Filter, typename Signals, typename InSignatures = mpl::vector<> >
 class filter_base : public dataflow::component<filter_component_traits<Filter, Signals, InSignatures> >
 {
 public:
Added: sandbox/SOC/2007/signals/boost/dataflow/utility/bind_functor.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/boost/dataflow/utility/bind_functor.hpp	2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -0,0 +1,54 @@
+// Copyright 2008 Stjepan Rajko.
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_DATAFLOW__UTILITY__BIND_FUNCTOR_HPP
+#define BOOST_DATAFLOW__UTILITY__BIND_FUNCTOR_HPP
+
+
+#include <boost/bind.hpp>
+#include <boost/bind/placeholders.hpp>
+#include <boost/function.hpp>
+
+
+namespace boost { namespace dataflow {
+
+namespace utility {
+
+    struct bind_functor
+    {
+        typedef boost::function<void()> result_type;
+        
+        template<typename T0>
+        result_type operator()(const T0 &t0)
+        {
+            return boost::bind(t0);
+        }
+
+        template<typename T0, typename T1>
+        result_type operator()(const T0 &t0, const T1 &t1)
+        {
+            return boost::bind(t0, t1);
+        }
+
+        template<typename T0, typename T1, typename T2>
+        result_type operator()(const T0 &t0, const T1 &t1, const T2 &t2)
+        {
+            return boost::bind(t0, t1, t2);
+        }
+
+        template<typename T0, typename T1, typename T2, typename T3>
+        result_type operator()(const T0 &t0, const T1 &t1, const T2 &t2, const T3 &t3)
+        {
+            return boost::bind(t0, t1, t2, t3);
+        }
+        
+        //...
+    };
+
+} // namespace utility
+
+} } // namespace boost::dataflow
+
+#endif // BOOST_DATAFLOW__UTILITY__BIND_FUNCTOR_HPP
Modified: sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj	(original)
+++ sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj	2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -143,6 +143,7 @@
                 089C098C0E0B11FF00397123 /* test_network.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_network.cpp; sourceTree = "<group>"; };
                 089CDA940D832AD200731C70 /* tutorial.qbk */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = tutorial.qbk; sourceTree = "<group>"; };
                 089CDAA90D8333CC00731C70 /* unary_operation.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = unary_operation.hpp; sourceTree = "<group>"; };
+		089DE1580E730C100038115E /* bind_functor.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = bind_functor.hpp; sourceTree = "<group>"; };
                 089E78C20E132E220008C0BB /* dynamic_multi_port.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = dynamic_multi_port.hpp; sourceTree = "<group>"; };
                 089F845A0E2CFAB900F6B668 /* lazy_sequence.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = lazy_sequence.hpp; sourceTree = "<group>"; };
                 089F845E0E2D032E00F6B668 /* test_lazy_sequence.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_lazy_sequence.cpp; sourceTree = "<group>"; };
@@ -200,6 +201,7 @@
                 08A364C10E129DC7001E6002 /* test_dynamic_port.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_dynamic_port.cpp; sourceTree = "<group>"; };
                 08A438900E295AE7009845FD /* component.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = component.cpp; sourceTree = "<group>"; };
                 08A439B80E295B8B009845FD /* Jamfile */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.jam; path = Jamfile; sourceTree = "<group>"; };
+		08A6948E0E71D7100065ECFD /* threadpool_component_example.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = threadpool_component_example.cpp; sourceTree = "<group>"; };
                 08A6B2650E25A566005539F2 /* blueprint_bank.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = blueprint_bank.cpp; sourceTree = "<group>"; };
                 08A6B2660E25A566005539F2 /* blueprint_bank.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = blueprint_bank.hpp; sourceTree = "<group>"; };
                 08A6B2670E25A566005539F2 /* blueprint_component.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = blueprint_component.cpp; sourceTree = "<group>"; };
@@ -757,6 +759,7 @@
                                 08E228DF0E6207EC00D1C2AF /* boost */,
                                 08E228FE0E62084100D1C2AF /* Jamfile */,
                                 08E229020E6208AC00D1C2AF /* threadpool_example.cpp */,
+				08A6948E0E71D7100065ECFD /* threadpool_component_example.cpp */,
                         );
                         path = threadpool;
                         sourceTree = "<group>";
@@ -1052,6 +1055,7 @@
                                 089AE6390D79D95C00AB9DA8 /* member_function_signature.hpp */,
                                 08C9D7DA0D83C5EB00354FF8 /* bind_mem_fn_overload.hpp */,
                                 08AD8AC40D84E3A9008A9764 /* has_call_operator.hpp */,
+				089DE1580E730C100038115E /* bind_functor.hpp */,
                         );
                         path = utility;
                         sourceTree = "<group>";
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile	(original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile	2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -13,3 +13,4 @@
     ;
 
 exe threadpool_example : threadpool_example.cpp ;
+exe threadpool_component_example : threadpool_component_example.cpp ;
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp	2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -0,0 +1,246 @@
+// Copyright 2008 Stjepan Rajko.
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <iostream>
+#include <cstdlib>
+#include <stdexcept>
+
+#include <boost/future/future.hpp>
+#include <boost/fusion/include/fused.hpp>
+#include <boost/fusion/include/join.hpp>
+#include <boost/fusion/include/make_vector.hpp>
+#define signalslib signals
+#define signals signals
+#include <boost/thread_safe_signal.hpp>
+//#include <boost/signals.hpp>
+
+#include <boost/dataflow/signals/component/storage.hpp>
+#include <boost/dataflow/signals/component/function.hpp>
+#include <boost/dataflow/signals/connection/operators.hpp>
+#include <boost/dataflow/utility/bind_mem_fn.hpp>
+#include <boost/dataflow/utility/bind_functor.hpp>
+
+#include "boost/tp/fifo.hpp"
+#include "boost/tp/lazy.hpp"
+#include "boost/tp/pool.hpp"
+#include "boost/tp/poolsize.hpp"
+#include "boost/tp/bounded_channel.hpp"
+#include "boost/tp/priority.hpp"
+
+
+namespace tp = boost::tp;
+
+template<typename Threadpool, typename Component>
+class async_component;
+
+namespace detail {
+
+    template<typename Threadpool, typename Component>
+    class async_component_impl
+    // filter_base is like filter, but it doesn't come with it's own signal.
+    // instead, the derived class must provide a default_signal member function
+    // that refers to the default signal.  We will use this to return the
+    // default signal of the underlying component.
+         : public boost::signals::filter_base<async_component<Threadpool, Component>, typename Component::signal_type >
+    {
+    public:
+        typedef typename Component::signature_type signature_type;
+        typedef typename Component::signal_type signal_type;
+        typedef void result_type;
+        
+        template<typename T0>
+        async_component_impl(Threadpool &threadpool, const T0 &t0)
+            : m_component(t0), m_threadpool(threadpool)
+        {
+            // record the appropriate operator() overload of Component into m_component_function,
+            // so we can submit it as a task later
+            typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;
+
+            m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
+                (static_cast<mem_fn_type>(&Component::operator()), m_component);
+        }
+        
+        template <class Seq>
+        void operator()(const Seq &vec_par) const
+        {
+            // add the next function as a task in the pool
+            std::cout << "adding task" << std::endl;
+            // bind_functor is just a function object that calls bind
+            // we create a fused version so we can call it with a fusion sequence
+            boost::fusion::fused<boost::dataflow::utility::bind_functor> fused_bind;
+            
+            // submit the task (the first parameter to bind is the function,
+            // and the rest are the bound function arguments).
+            boost::tp::task< void > t(
+                m_threadpool.submit(
+                    fused_bind(
+                        boost::fusion::join(
+                            boost::fusion::make_vector(m_component_function),
+                            vec_par
+                    )   )
+                    ));
+        }
+        // with this, anything that connects to the async_component's default
+        // signal will actually connect to the default signal of the underlying
+        // component
+        typename Component::signal_type &default_signal()
+        {
+            namespace dataflow = boost::dataflow;
+            return dataflow::get_default_port<
+                    dataflow::args::left,
+                    dataflow::signals::connect_mechanism, 
+                    dataflow::signals::tag
+                > (m_component);
+        }
+
+    private:
+        Component m_component;
+        Threadpool &m_threadpool;
+        boost::function<signature_type> m_component_function;
+    };
+}
+
+
+// our new async_component class - it will create a new task for the underlying
+// component when it's operator() is called.
+template<typename Threadpool, typename Component>
+class async_component : public boost::fusion::unfused_inherited<
+    detail::async_component_impl<Threadpool,Component>,
+    typename boost::function_types::parameter_types<typename Component::signature_type>::type >
+{
+    typedef boost::fusion::unfused_inherited<
+        detail::async_component_impl<Threadpool,Component>,
+        typename boost::function_types::parameter_types<typename Component::signature_type>::type>
+        base_type;
+public:
+    template<typename T0>
+    async_component(Threadpool &threadpool, const T0 &t0)
+        : base_type(threadpool, t0)
+    {}
+};
+
+// just an operation to work with
+int inc_fn(int x)
+{
+    std::cout << "filter: " << x+1 << std::endl;
+    return x+1;
+}
+
+// a component that displays text, waits a second, and then sends a signal
+class printer : public boost::signals::filter<printer, void()>
+{
+public:
+    printer(const std::string &text)
+        : m_text(text)
+    {}
+    void operator()()
+    {
+        std::cout << m_text << std::endl;
+        boost::this_thread::sleep(boost::posix_time::seconds(1));
+        out();
+    }
+private:
+    std::string m_text;
+};
+
+// a function to submit the first task
+template<typename Threadpool, typename Next>
+void submit(Threadpool &threadpool, Next &next)
+{
+    tp::task< void > task(
+        threadpool.submit(
+            boost::bind(&Next::send, boost::ref(next))));
+}
+
+int main( int argc, char *argv[])
+{
+   typedef 
+        tp::pool<
+            tp::fixed,
+            tp::unbounded_channel< tp::fifo >
+        > threadpool_type;
+        
+    threadpool_type pool(tp::max_poolsize(5));
+
+    typedef boost::signals::storage<void(int)> source_type;
+    typedef boost::signals::function<void(int), int(int)> filter_type;
+    typedef async_component<threadpool_type, filter_type> async_filter_type;
+
+    // our components
+    source_type source(1);
+    async_filter_type increase1(pool, inc_fn);
+    filter_type increase2(inc_fn);
+    filter_type increase3(inc_fn);
+    async_filter_type increase4(pool, inc_fn);
+    async_filter_type increase5(pool, inc_fn);
+    
+    // our network
+    //  increase1 >>= increase2 >>= increase3 will be in its own thread
+    //  increase3 will be in its own thread
+    //  increase4 will be in its own thread
+    source
+        | (increase1 >>= increase2 >>= increase3)
+        | (increase4 >>= increase5);
+
+    // submit the first task
+    submit(pool, source);
+           
+    // wait a little
+    boost::this_thread::sleep(boost::posix_time::seconds(1));
+
+    // --------------------------------------------------------------
+
+/*
+   typedef 
+        tp::pool<
+            tp::fixed,
+            tp::unbounded_channel< tp::priority< int > >
+        > priority_threadpool_type;
+        
+    priority_threadpool_type priority_pool(tp::max_poolsize(5));
+    
+    typedef boost::signals::storage<void()> print_starter_type;
+    typedef async_component<priority_threadpool_type, printer> async_printer_type;
+    
+    print_starter_type print_starter;
+    async_printer_type print1(priority_pool, "World");
+    async_printer_type print2(priority_pool, " ");
+    async_printer_type print3(priority_pool, "Hello");
+    async_printer_type print4(priority_pool, "!");
+    
+    print_starter
+        | print1
+        | print2
+        | print3
+        | print4;
+        
+    // submit the first task
+    submit(priority_pool, print_starter);
+
+    // wait a little
+    boost::this_thread::sleep(boost::posix_time::seconds(1));
+*/
+        
+    // --------------------------------------------------------------
+
+    typedef boost::signals::storage<void()> tick_starter_type;
+    typedef async_component<threadpool_type, printer> ticker_type;
+    
+    // our components
+    tick_starter_type tick_starter;
+    ticker_type ticker1(pool, "tick 1..."), ticker2(pool, "tick 2..."), ticker3(pool, "tick 3...");
+
+    // our network
+    tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
+    
+    // submit the first task
+    submit(pool, tick_starter);
+
+    // wait a little
+    boost::this_thread::sleep(boost::posix_time::seconds(5));
+
+    pool.shutdown();
+    return 0;
+}