$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r51111 - in sandbox/interthreads/libs/interthreads/example: . detail
From: vicente.botet_at_[hidden]
Date: 2009-02-08 16:10:44
Author: viboes
Date: 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
New Revision: 51111
URL: http://svn.boost.org/trac/boost/changeset/51111
Log:
interthreads version 0.4
    *  New free functions for all the AsynchronousCompletionToken  operations, providing a higher degree of freedom.
    * Missing have_all_values(), have_all_exception() and are_all_ready() functions on AsynchronousCompletionToken fusion tuples.
    * get_all: getting all the values from a tuple of AsynchronousCompletionToken works now.
    * fork_after overloaded for a single dependency
    * wait_all overloaded for a single ACT.
    * wait_for_all evaluate one of its elements on the current thread
    * No need to use wait_and_get() on thread_specific_shared_ptr<> to synchronize with the decoration if the thread is created using a AsynchronousExecutor decorator. In this case the synchro is done before returning the AsynchronousCompletionToken. See the tutorial and the mono_thread_id example.
Text files modified: 
   sandbox/interthreads/libs/interthreads/example/async_ostream.cpp             |    80 +++++++++++++++++++++++++++++---------- 
   sandbox/interthreads/libs/interthreads/example/async_ostream.hpp             |     4 +                                       
   sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp          |    66 +++++++++++++++++---------------        
   sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp |     3 +                                       
   sandbox/interthreads/libs/interthreads/example/hello_world.cpp               |     2                                         
   sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp            |    18 +++++----                               
   sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp       |    21 +++++-----                              
   sandbox/interthreads/libs/interthreads/example/timestamped.hpp               |     6 +-                                      
   8 files changed, 124 insertions(+), 76 deletions(-)
Modified: sandbox/interthreads/libs/interthreads/example/async_ostream.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/async_ostream.cpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/async_ostream.cpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,6 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. Distributed under the Boost
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. Distributed under the Boost
 // Software License, Version 1.0. (See accompanying file
 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -21,7 +22,7 @@
 #include <iostream>                             // std::ostream
 #include <sstream>                              // std::stringstream
 
-boost::mutex out_global_mutex;
+//boost::mutex out_global_mutex;
 
 namespace boost {
 namespace interthreads {
@@ -37,6 +38,10 @@
         : current_(new element_type()) 
         , seq_(0)
         {}
+            
+        ~async_ostream_thread_ctx() {
+            delete current_;
+        }
 
         boost::mutex& get_mutex() {return mutex_;}
         std::stringstream& buffer() {return current_->value_;}
@@ -70,7 +75,6 @@
         }
         void print_stats() {
             boost::thread::id id= boost::this_thread::get_id();
-            boost::lock_guard<boost::mutex> lock(out_global_mutex);
             std::cout << "TID=" << id <<" nb_push " << nb_push  << std::endl;
             std::cout << "TID=" << id <<" nb_push_gt_1 " << nb_push  << std::endl;
             std::cout << "TID=" << id <<" nb_get " << nb_push  << std::endl;
@@ -78,6 +82,7 @@
             std::cout << "TID=" << id <<" inc " << inc_  << std::endl;
         };
         bool empty() {
+            boost::lock_guard<boost::mutex> lock(mutex_);
             return queue_.empty();
         }
         void inc() {
@@ -97,7 +102,10 @@
             return  e;
         }
         element_type *current_;
+#if 1        
         queue_type queue_;   
+#else
+#endif
         unsigned seq_;   
         boost::mutex mutex_;
         unsigned nb_push_gt_1;
@@ -111,29 +119,34 @@
     //==========================================================================================
 #ifdef CONCENTRATOR            
     struct async_ostream_concentrator {
-		static void loop(async_ostream_sink::impl* that);
-		async_ostream_concentrator(async_ostream_sink::impl* impl_ptr) 
+        static void loop(async_ostream_sink::impl* that);
+        async_ostream_concentrator(async_ostream_sink::impl* impl_ptr) 
             : thread_(boost::bind(loop, impl_ptr)) {}
-		~async_ostream_concentrator() {}
+        ~async_ostream_concentrator() {}
 
-	private:
-		boost::thread thread_;
-	};
+    private:
+        boost::thread thread_;
+    };
 #endif            
-	
+    
     //==========================================================================================
     
     typedef thread_specific_shared_ptr<async_ostream_thread_ctx> tsss_type;
     
     struct async_ostream_sink::impl {
         impl(std::ostream& os) 
-        : os_(os)
+        : end_(false)
+        ,os_(os)
         , tsss_(terminate)
 #ifndef CONCENTRATOR            
         , thread_(boost::bind(loop, this)) 
 #endif            
         {}
-        
+            
+        ~impl()  {
+        }
+            
+        bool end_;
         std::ostream& os_;
         tsss_type tsss_;
         priority_queue_type queue_;
@@ -141,14 +154,13 @@
         boost::once_flag concentrator_flag_;
         async_ostream_concentrator* concentrator_;
 #else
-		boost::thread thread_;            
+        boost::thread thread_;            
 #endif            
             
         static void terminate(shared_ptr<async_ostream_thread_ctx> that) {
             while (!that->empty()) {
                 that->inc();
             }
-            //that->print_stats();            
         }
         
 #ifdef CONCENTRATOR            
@@ -160,7 +172,7 @@
                              boost::bind(create_concentrator_once, this));
         }
 #else        
-		static void loop(impl* that);
+        static void loop(impl* that);
 #endif            
         
         std::streamsize write(const char* s, std::streamsize n) {
@@ -171,6 +183,14 @@
             tsss_->flush();
         }            
         
+        void close() {
+            end_=true;
+#ifdef CONCENTRATOR            
+            concentrator_.close();
+#else
+            thread_.join();            
+#endif            
+        }            
             
     };
 
@@ -179,6 +199,10 @@
     async_ostream_sink::async_ostream_sink(std::ostream& os)
         : impl_(new async_ostream_sink::impl(os)) {}
 
+    async_ostream_sink::~async_ostream_sink()
+        {
+        }
+
     std::streamsize async_ostream_sink::write(const char* s, std::streamsize n) {
         return impl_->write(s,n);
     }
@@ -209,14 +233,16 @@
                 //it->second->print_stats();                
             }
             if (that->queue_.empty()) {
+                if (that->end_) break;
                 boost::this_thread::sleep(boost::posix_time::milliseconds(10));
             } else {
                 element_type* e = that->queue_.top();
                 that->queue_.pop();
+
     #ifdef XTIME
-                os_ << e->seq_ << "["<<e->date_.sec<<":"<<e->date_.nsec<<"]| " << e->value_.str();
+                os_ << e->seq_ << "["<<e->date_.sec<<":"<<e->date_.nsec<<"]| " << e->value_.str().length() << " | " <<  e->value_.str();
     #else
-                os_ << e->seq_ <<  "| " << e->value_.str();
+                os_ << e->seq_ <<  " | " << e->value_.str().length() <<  " | " << e->value_.str();
     #endif
                 delete e;
             }
@@ -233,14 +259,21 @@
         : base_type(os) 
         {}
 
+    async_ostream::~async_ostream()
+        {
+        cout_->impl_->close();
+
+        }
+
     void async_ostream::flush() {
-        this->base_type::flush();
+        //this->base_type::flush();
         async_ostream& d = *this;
         d->flush();
     }
             
     //==========================================================================================
     
+    // WARNING: static_variable
     async_ostream cout_(std::cout);
     
     void async_ostream::thread_specific_setup() {
@@ -249,9 +282,14 @@
         cout_->impl_->create_concentrator();
 #endif        
     }
-    
-    namespace detail 	{
-	    thread_decoration async_ostream_decoration(boost::interthreads::async_ostream::thread_specific_setup);
+
+#if 0    
+    void close() {
+        //cout_->impl_->close();
+    }
+#endif    
+    namespace detail {
+        thread_decoration async_ostream_decoration(boost::interthreads::async_ostream::thread_specific_setup);
     }
     
 }
Modified: sandbox/interthreads/libs/interthreads/example/async_ostream.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/async_ostream.hpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/async_ostream.hpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
 
 //////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. 
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. 
 // Distributed under the Boost Software License, Version 1.0. 
 // (See accompanying file LICENSE_1_0.txt or 
 //  copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -30,9 +30,11 @@
         typedef iostreams::sink_tag    category;
 
         async_ostream(std::ostream& os);
+        ~async_ostream();
         void flush();
 
         static void thread_specific_setup();
+        static void close();
     };
     
     extern async_ostream cout_;
Modified: sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006. 
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. 
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. 
 // Distributed under the Boost Software License, Version 1.0. 
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -12,26 +12,25 @@
 
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/thread.hpp>
-          
+#define COUT_
+#ifdef COUT_
+#else          
 boost::mutex out_global_mutex2;
-
+#endif
 void sleep(int sec)
 {
-	boost::xtime t;
-	boost::xtime_get(&t,1);	
+    boost::xtime t;
+    boost::xtime_get(&t,1);
     t.sec += sec; 
     boost::thread::sleep(t);
 }
 
 
-//    #include <boost/interthreads/threader.hpp>
-//    #include <boost/interthreads/joiner_tuple.hpp>
 #include <boost/interthreads/thread_decorator.hpp>
 #include <boost/interthreads/thread_keep_alive.hpp>
 
 #include <boost/interthreads/typeof/threader_decorator.hpp>
-#include <boost/interthreads/wait_for_all.hpp>
-//#include <boost/type_traits.hpp>
+#include <boost/interthreads/algorithm.hpp>
 
 #include "./async_ostream.hpp"
 #include <boost/thread/thread.hpp>
@@ -40,19 +39,39 @@
 #include <sstream>
         
 namespace bith = boost::interthreads;
+void my_thread1_on_dead_thread(boost::thread::id id, boost::thread*) {
+    std::cout << "my_thread1 " << id << " do not responds to keep-alive" << std::endl;
+}
 
-int my_thread1() {
-    sleep(2);
+
+int my_thread11() {
+    bith::this_thread::enable_keep_alive enabler;
+    bith::this_thread::set_on_dead_thread(my_thread1_on_dead_thread);     
+    boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
+    bith::this_thread::keep_alive_point();            
+    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+    bith::this_thread::keep_alive_point();            
+    boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+    bith::this_thread::keep_alive_point();            
+
+#ifdef COUT_
+        bith::cout_ << "my_thread1" << std::endl;
+        bith::cout_.flush();
+#else
+        {
+            boost::lock_guard<boost::mutex> lock(out_global_mutex2);
+            std::cout << "my_thread1" << std::endl;
+        }
+#endif            
     return 0;
 }
 int my_thread() {
     bith::this_thread::enable_keep_alive enabler;
-    for (int i=0; i<10; i++) {
+    for (unsigned i=0; i<10; i++) {
         bith::this_thread::keep_alive_point();            
 
         boost::thread::id id= boost::this_thread::get_id();
         std::stringstream sss;
-//#define COUT_
 #ifdef COUT_
         bith::cout_ << "TID=" << i << " " << id << std::endl;
         bith::cout_.flush();
@@ -62,29 +81,14 @@
             std::cout << "TID=" << i << " " << id << std::endl;
         }
 #endif            
-        boost::this_thread::sleep(boost::posix_time::milliseconds(10));
+        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
     }
     return 0;
 }
     
 int main() {
-    {
+    
     bith::shared_threader_decorator ae;
-    bith::wait_for_all(ae, my_thread, my_thread);
-    }
-    #if 0
-    boost::thread th1(bith::make_decorator(my_thread));
-    boost::thread th2(bith::make_decorator(my_thread));
-    boost::thread th3(bith::make_decorator(my_thread));
-    boost::thread th4(bith::make_decorator(my_thread));
-    boost::thread th5(bith::make_decorator(my_thread));
-    th1.join();
-    th2.join();
-    th3.join();
-    th4.join();
-    th5.join();
-    #endif
-
-        
+    bith::wait_for_all(ae, my_thread, my_thread, my_thread11);
     return 0;
 }
Modified: sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
 
 //////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. 
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. 
 // Distributed under the Boost Software License, Version 1.0. 
 // (See accompanying file LICENSE_1_0.txt or 
 //  copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -28,6 +28,7 @@
             typedef  boost::iostreams::sink_tag  category;
             
             async_ostream_sink(std::ostream& os);
+            ~async_ostream_sink();
             
             std::streamsize write(const char* s, std::streamsize n);
             void flush();
Modified: sandbox/interthreads/libs/interthreads/example/hello_world.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/hello_world.cpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/hello_world.cpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006. 
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. 
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. 
 // Distributed under the Boost Software License, Version 1.0. 
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
Modified: sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006. 
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. 
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. 
 // Distributed under the Boost Software License, Version 1.0. 
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -16,8 +16,8 @@
 
 void sleep(int sec)
 {
-	boost::xtime t;
-	boost::xtime_get(&t,1);	
+    boost::xtime t;
+    boost::xtime_get(&t,1); 
     t.sec += sec; 
     boost::thread::sleep(t);
 }
@@ -76,7 +76,7 @@
               << std::endl;
     }
 };
-
+#if 0
 struct mono_thread_id_wait_and_get {
     template<typename T>
     void operator()(T& t) const {
@@ -87,7 +87,7 @@
         }
     }
 };
-
+#endif
 bith::thread_decoration mono_thread_id::decoration_(mono_thread_id::setup);
 mono_thread_id::tssp_type mono_thread_id::current_;
 unsigned mono_thread_id::counter_=0;
@@ -107,8 +107,9 @@
 
 void doit() {
     bith::shared_threader_decorator ae;
-    bith::result_of::fork_all<bith::shared_threader_decorator, boost::fusion::tuple<void(*)(), void(*)()> >::type handles = 
-               bith::fork_all(ae, my_thread, my_thread);
+    BOOST_AUTO(handles, bith::fork_all(ae, my_thread, my_thread));
+#if 0
+    //bith::result_of::fork_all<bith::shared_threader_decorator, boost::fusion::tuple<void(*)(), void(*)()> >::type handles = 
     {
         const boost::shared_ptr<unsigned> shp1 = mono_thread_id::current_.wait_and_get(boost::fusion::at_c<0>(handles).get_id());
         if (shp1.get()==0) {
@@ -124,7 +125,8 @@
         }
     }
     //sleep(1);
-    boost::fusion::for_each(handles, mono_thread_id_wait_and_get());
+    //boost::fusion::for_each(handles, mono_thread_id_wait_and_get());
+#endif    
     boost::fusion::for_each(handles, mono_thread_id_out());
     bith::join_all(handles);
 }
Modified: sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006. 
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. 
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. 
 // Distributed under the Boost Software License, Version 1.0. 
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -16,8 +16,8 @@
 
 void sleep(int sec)
 {
-	boost::xtime t;
-	boost::xtime_get(&t,1);	
+    boost::xtime t;
+    boost::xtime_get(&t,1);
     t.sec += sec; 
     boost::thread::sleep(t);
 }
@@ -35,39 +35,42 @@
 namespace bith = boost::interthreads;
 
 int my_thread1() {
+    sleep(3);
     {
         boost::lock_guard<boost::mutex> lock(out_global_mutex);
         std::cout << "1 thread_id=" << boost::this_thread::get_id() << std::endl;
     }
-    sleep(3);
     return 0;
 }
     
 int my_thread2() {
+    sleep(1);
     {
         boost::lock_guard<boost::mutex> lock(out_global_mutex);
         std::cout << "2 thread_id=" << boost::this_thread::get_id() << std::endl;
     }
-    sleep(1);
     return 0;
 }
 
 int my_thread3() {
+    sleep(2);
     {
         boost::lock_guard<boost::mutex> lock(out_global_mutex);
         std::cout << "3 thread_id=" << boost::this_thread::get_id() << std::endl;
     }
-    sleep(2);
     return 0;
 }
 
 
 int main() {
     bith::shared_threader ae;
-    BOOST_AUTO(handles,bith::fork_all(ae, my_thread1, my_thread2, my_thread3));
     BOOST_AUTO(result,bith::wait_for_any(ae, my_thread1, my_thread2, my_thread3));
-    std::cout << "Algotithm " << result.first+1 << " finished the first with wait_for_any" << std::endl;
+    std::cout << "Algotithm " << result.first+1 << " finished the first with wait_for_any result=" << result.second << std::endl;
 
+
+#if 0
+    //BOOST_AUTO(handles,bith::fork_all(ae, my_thread1, my_thread2, my_thread3));
+    
     bith::thread_tuple<3> tt_0(my_thread1, my_thread2, my_thread3);
     bith::thread_tuple<3> tt_1;
     tt_1= tt_0.move();
@@ -80,8 +83,6 @@
     bith::thread_tuple<3> kk_0= bith::make_thread_tuple(my_thread1, my_thread2, my_thread3);
     kk_0.join_all();
     std::cout << "All finished join_all" << std::endl;
-
-#if 0
         
     bith::thread_group_once tgo;
     boost::thread* th1 = tgo.create_thread(my_thread1);
Modified: sandbox/interthreads/libs/interthreads/example/timestamped.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/timestamped.hpp	(original)
+++ sandbox/interthreads/libs/interthreads/example/timestamped.hpp	2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
 
 //////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. Distributed under the Boost
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. Distributed under the Boost
 // Software License, Version 1.0. (See accompanying file
 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -23,7 +23,7 @@
 #ifdef XTIME
         boost::xtime date_;
         void reset_date(unsigned seq) {
-	        boost::xtime_get(&date_,1);
+            boost::xtime_get(&date_,1);
             seq_ = seq;
         }
         struct ref_comparator {
@@ -56,7 +56,7 @@
 #else
         system_time date_;
         void reset_date(unsigned seq) {
-            date_ = system_time();	
+            date_ = system_time();
             seq_ = seq;
         }
         struct ref_comparator {