$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r52496 - in sandbox/join: boost boost/join boost/join/base boost/join/idioms libs libs/join libs/join/doc libs/join/examples
From: yigongliu_at_[hidden]
Date: 2009-04-19 20:00:41
Author: yigongliu
Date: 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
New Revision: 52496
URL: http://svn.boost.org/trac/boost/changeset/52496
Log:
update files to Join r6
Added:
   sandbox/join/boost/
   sandbox/join/boost/join/
   sandbox/join/boost/join/base/
   sandbox/join/boost/join/base/exceptions.hpp   (contents, props changed)
   sandbox/join/boost/join/base/join_base.hpp   (contents, props changed)
   sandbox/join/boost/join/base/joint.hpp   (contents, props changed)
   sandbox/join/boost/join/base/port.hpp   (contents, props changed)
   sandbox/join/boost/join/base/utils.hpp   (contents, props changed)
   sandbox/join/boost/join/idioms/
   sandbox/join/boost/join/idioms/asio_executor.hpp   (contents, props changed)
   sandbox/join/boost/join/idioms/executor.hpp   (contents, props changed)
   sandbox/join/boost/join/idioms/rr_executor.hpp   (contents, props changed)
   sandbox/join/boost/join/join.hpp   (contents, props changed)
   sandbox/join/libs/
   sandbox/join/libs/join/
   sandbox/join/libs/join/doc/
   sandbox/join/libs/join/doc/boost_join_design.html   (contents, props changed)
   sandbox/join/libs/join/doc/chords_joints.html   (contents, props changed)
   sandbox/join/libs/join/doc/internals.html   (contents, props changed)
   sandbox/join/libs/join/doc/references.html   (contents, props changed)
   sandbox/join/libs/join/doc/samples.html   (contents, props changed)
   sandbox/join/libs/join/doc/synopsis_port.html   (contents, props changed)
   sandbox/join/libs/join/doc/tutorials.html   (contents, props changed)
   sandbox/join/libs/join/examples/
   sandbox/join/libs/join/examples/Jamfile.v2   (contents, props changed)
   sandbox/join/libs/join/examples/async_call_ret.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/bounded_buffer.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/buffer.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/buffer_lambda.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/buffer_phoenix.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/chain.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/flows.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/join_many.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/joint_lifetime.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/prime_sieve.cpp   (contents, props changed)
   sandbox/join/libs/join/examples/producer_consumer.cpp   (contents, props changed)
Added: sandbox/join/boost/join/base/exceptions.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/exceptions.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,110 @@
+//
+// boost/join/exceptions.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_EXCEPTIONS_HPP
+#define BOOST_JOIN_EXCEPTIONS_HPP
+
+namespace boost {
+  namespace join {
+
+    class join_exception : public std::exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join-related exception";
+      }
+    };
+    class not_in_chord_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: async<>/synch<> channels not in chord exception";
+      }
+    };
+    class double_association_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: async<>/synch<> channels associated with more than one joint exception";
+      }
+    };
+    class queue_overflow_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: queue overflow exception";
+      }
+    };
+    class missing_result_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: synch<> channel missing result exception";
+      }
+    };
+    class no_executor_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: no executor exception";
+      }
+    };
+    class hidden_chord_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: hidden chord exception";
+      }
+    };
+    class too_many_ports_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: Too many channels defined in a joint";
+      }
+    };
+    class chord_override_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: Chord override failure, chord not found";
+      }
+    };
+    class chord_remove_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: Chord remove failure, chord not found";
+      }
+    };
+    class executor_missing_rr_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: executor's dynamic task queues are used, however non round robin scheduling is defined";
+      }
+    };
+    class synch_not_1st_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: synchronous channel is used not as the first method of chord";
+      }
+    };
+    class single_synch_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: more than one synchronous channels are used";
+      }
+    };
+    class port_chord_reset_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: channels or joint are reset unexpectedly";
+      }
+    };
+    class synch_time_out_exception : public join_exception {
+    public:
+      virtual const char *what() const throw () {
+        return "Join: synchronous ports time out unexpectedly";
+      }
+    };
+
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/base/join_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/join_base.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,118 @@
+//
+// boost/join/join_base.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_BASE_HPP
+#define BOOST_JOIN_BASE_HPP
+
+#include <string>
+#include <vector>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <boost/utility.hpp>
+#include <boost/join/base/utils.hpp>
+#include <boost/join/base/exceptions.hpp>
+
+namespace boost {
+  namespace join {
+    namespace detail {
+
+      class port;
+
+      class joint_base {
+      public:
+        typedef boost::function0<void> callable;
+        const char * name_;
+        logger log;
+        boost::mutex mutex_; // protects joint global status including ports and chords
+        joint_base(const char *n=0) : name_(n), log(n) {}
+        virtual ~joint_base() {}
+        virtual void reset() = 0;
+        virtual bool check_heartbeat() = 0;
+        virtual bool has_spawn(void) = 0;
+        virtual void spawn(callable) = 0;
+        virtual bool port_invoked(int ind) = 0;
+        virtual void port_revoked(int ind) = 0;
+        virtual void port_remove(int ind) = 0;
+      };
+
+      template <typename R>
+      class chord_base {
+      public:
+        virtual void capture_arguments(boost::function0<R> &cb) = 0;
+        virtual ~chord_base() {}
+      };      
+
+      class port {
+      public:
+        enum port_type {
+          async,
+          synch
+        } ;
+        port_type type_;
+        boost::shared_ptr<joint_base> joint_; //pointer to owning joint
+        int index_;  //my index in joint
+        unsigned int num_pending_; //pending calls/msgs at this port
+ 
+        port(port_type t, boost::shared_ptr<joint_base> j) : 
+          type_(t), joint_(j), index_(-1), num_pending_(0) {
+        }
+
+        virtual ~port() {
+          detach();
+        }
+
+        //called from client, detach port from joint
+        void detach() {
+          if (joint_ != 0) {
+            joint_->log.stream() << "a port is being destructed..." << joint_.use_count() << logger::endl;
+            joint_->port_remove(index_);
+            joint_.reset();
+          }
+        }
+
+        //when detached from joint, need reset
+        //caled from joint destructor
+        virtual void reset(void) {
+          if (joint_ != 0)
+            joint_->log.msg("a port is reset ...");
+          index_ = -1;
+          num_pending_ = 0;
+        }
+
+        //called from async/synch interface, need hold a shared_ptr to joint so its destructor will not be called right away
+        void reset_joint() {
+          boost::shared_ptr<joint_base> my_joint(joint_); 
+          if (my_joint != 0)
+            my_joint->reset();
+        }
+
+        //detach port from joint, only can be done by chord_remove
+        //when all its chords are removed, a port is reset & detached
+        //called from joint
+        void detach_joint(void) {
+          if (joint_ !=0) {
+            joint_->log.stream() << "a port is detached ... joint remaining count: " << joint_.use_count()-1 << logger::endl;
+            joint_.reset();
+          }
+        }
+
+        bool test_chord_fire(void) {
+          return joint_->port_invoked(index_);
+        }
+
+        void port_revoked(void) {
+          joint_->port_revoked(index_);
+        }
+      };
+
+    }
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/base/joint.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/joint.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,1143 @@
+//
+// boost/join/joint.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_JOINT_HPP
+#define BOOST_JOIN_JOINT_HPP
+
+#include <vector>
+#include <bitset>
+#include <algorithm>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/ref.hpp>
+#include <boost/mpl/if.hpp>
+#include <boost/join/base/join_base.hpp>
+#include <boost/join/base/exceptions.hpp>
+
+namespace boost {
+  namespace join {
+
+    namespace detail {
+
+      template <size_t size>
+      struct bitmap {
+        enum {
+          max_size = size
+        };
+        typedef typename boost::mpl::if_c<(size<=32), unsigned int, std::bitset<size> >::type bitset_type;
+        bitset_type bitmap_;
+        bitmap(unsigned int val=0) : bitmap_(val) {}
+        void set(bitmap &t) { bitmap_ |= t.bitmap_; }
+        void clear(bitmap &t) { bitmap_ &= ~t.bitmap_; }
+        bool test(bitmap &t) { return (bitmap_ & t.bitmap_) != 0; }
+        bool match(bitmap &b) { return (~bitmap_ & b.bitmap_) == 0; }
+        bool operator==(bitmap &t) { return bitmap_ == t.bitmap_; }
+        size_t num_of_ones(void) {
+          size_t num = 0;
+          for(int i = 0; i<max_size; i++)
+            if (bitmap_ & (1<<i)) num++;
+          return num;
+        }
+      };
+
+      template <typename PortT>
+      typename PortT::argument_type
+      top(PortT *p) {
+        return p->top();
+      }
+    
+      template <typename PortT>
+      std::vector<typename PortT::argument_type>
+      top(std::vector<PortT*> &vp) {
+        std::vector<typename PortT::argument_type> vv;
+        for(size_t i=0; i<vp.size(); i++)
+          vv.push_back(vp[i]->top());
+        return vv;
+      }
+    
+      template <typename PortT>
+      void pop_top(PortT *p) {
+        p->pop_top();
+      }
+    
+      template <typename PortT>
+      void pop_top(std::vector<PortT*> &vp) {
+        for(size_t i=0; i<vp.size(); i++)
+          vp[i]->pop_top();
+      }
+
+      template <typename P> struct arg_type {};
+      template <typename PortT>
+      struct arg_type<PortT*> {
+        typedef typename PortT::argument_type result;
+      };
+
+      template <typename PortT>
+      struct arg_type<std::vector<PortT*> > {
+        typedef std::vector<typename PortT::argument_type> result;
+      };
+
+      template <typename P> struct res_type {};
+      template <typename PortT>
+      struct res_type<PortT*> {
+        typedef typename PortT::result_type result;
+      };
+    
+      template <typename PortT>
+      struct res_type<std::vector<PortT*> > {
+        typedef void result;
+      };
+
+      template <typename PortT>
+      struct impl_ptr {
+        typedef typename PortT::ImplT* result;
+      };
+    
+      template <typename PortT>
+      struct impl_ptr<std::vector<PortT> > {
+        typedef std::vector<typename PortT::ImplT*> result;
+      };
+
+      template <typename PortT>
+      bool is_synch_port(PortT *p) {
+        return p->type_ == port::synch;
+      }
+      template <typename PortT>
+      bool is_synch_port(std::vector<PortT*> &vp) {
+        if (vp[0]->type_ == port::synch)
+          throw single_synch_exception();
+        return false;
+      }
+
+
+      //------- chord definitions ------
+
+      template <size_t max_size>
+      class chord_common {
+      public:
+        bitmap<max_size> mask_;// ports of this chord
+        size_t num_ports_; //for scheduling policy fire_as_much_as_possible
+        int priority_; //sched priority of chord: 0 - max, the bigger the lower
+
+        chord_common(bitmap<max_size> &m, int pri) : 
+          mask_(m), priority_(pri) {
+          num_ports_ = m.num_of_ones();
+        }
+        virtual ~chord_common() {}
+        virtual bool fire(port *caller_port) = 0;
+      };
+
+      //for chords which have a return type (so it must have a synch port)
+      template <typename ResT>
+      class chord_fire : public chord_base<ResT> {
+      public:
+        joint_base *joint_;
+        synch_port<ResT> *synch_p_;
+        chord_fire(joint_base *a, port *p) : joint_(a), synch_p_(0) {
+          synch_p_ = static_cast<synch_port<ResT>*>(p);
+        }
+        bool fire_chord(port *caller_port) {
+          joint_->log.msg("a chord fired");
+          if(caller_port->type_ == port::synch) {
+            //caller is sync, caller_port and synch_p_ should be the same
+            //invoke callback later in the same caller thread
+            synch_p_->add_chord_fired(this);
+            return true;
+          }
+          else {//caller is async_p
+            synch_p_->transfer(this);
+            return false;
+          }
+        }      
+      };
+
+      template <>
+      class chord_fire<void> : public chord_base<void> {
+      public:
+        joint_base *joint_;
+        synch_port<void> *synch_p_;
+        chord_fire(joint_base *a, port *p) : joint_(a), synch_p_(0) {
+          if (is_synch_port(p)) synch_p_ = static_cast<synch_port<void>*>(p);
+        }
+        bool fire_chord(port *caller_port) {
+          joint_->log.msg("a chord fired");
+          if(caller_port->type_ == port::synch) {
+            //caller is sync, caller_port and synch_p_ should be the same
+            //invoke callback later in the same caller thread
+            synch_p_->add_chord_fired(this);
+            return true;
+          }
+          else {//caller is async_p
+            if (synch_p_) {
+              synch_p_->transfer(this);
+              return false;
+            }
+            else { //no sync ports in chord
+              if(joint_->has_spawn()) {
+                //spawn the callback later
+                static_cast<async_port*>(caller_port)->add_chord_fired(this);
+                return true;
+              } else
+                throw no_executor_exception();
+            }
+          }
+          return false;
+        }      
+      };
+
+      template <size_t max_size, typename PortPtrT, typename CallT>
+      class chord1 : public chord_common<max_size>, public chord_fire<typename res_type<PortPtrT>::result> {
+        typedef typename res_type<PortPtrT>::result result_type;
+        typedef typename arg_type<PortPtrT>::result argument_type;
+      public:
+        PortPtrT port_;
+        boost::function1<result_type, argument_type> call_;
+        chord1(bitmap<max_size> &m, joint_base* a, PortPtrT p, CallT c, int pri) : 
+          chord_common<max_size>(m,pri), 
+          chord_fire<result_type>(a,p),
+          port_(p), call_(c) {}
+        void capture_arguments(boost::function0<result_type> &cb) {
+          cb = boost::bind(call_, top(port_));
+          pop_top(port_);
+        }      
+        bool fire(port *caller_port) {
+          return chord_fire<result_type>::fire_chord(caller_port);
+        }
+      };
+
+      template <size_t max_size, typename PortPtrT1, typename PortPtrT2, typename CallT>
+      class chord2 : public chord_common<max_size>, public chord_fire<typename res_type<PortPtrT1>::result> {
+        typedef typename res_type<PortPtrT1>::result result_type;
+        typedef typename arg_type<PortPtrT1>::result argument1_type;
+        typedef typename arg_type<PortPtrT2>::result argument2_type;
+      public:
+        PortPtrT1 port1_;
+        PortPtrT2 port2_;
+        boost::function2<result_type, argument1_type, argument2_type> call_;
+        chord2(bitmap<max_size> &m, joint_base* a, PortPtrT1 p1, PortPtrT2 p2, CallT c, int pri) : 
+          chord_common<max_size>(m,pri), 
+          chord_fire<result_type>(a,p1),
+          port1_(p1), port2_(p2), call_(c) {}
+        void capture_arguments(boost::function0<result_type> &cb) {
+          cb = boost::bind(call_, top(port1_), top(port2_));
+          pop_top(port1_);
+          pop_top(port2_);
+        }      
+        bool fire(port *caller_port) {
+          return chord_fire<result_type>::fire_chord(caller_port);
+        }
+      };
+
+      template <size_t max_size, typename PortPtrT1, typename PortPtrT2, typename PortPtrT3, typename CallT>
+      class chord3 : public chord_common<max_size>, public chord_fire<typename res_type<PortPtrT1>::result> {
+        typedef typename res_type<PortPtrT1>::result result_type;
+        typedef typename arg_type<PortPtrT1>::result argument1_type;
+        typedef typename arg_type<PortPtrT2>::result argument2_type;
+        typedef typename arg_type<PortPtrT3>::result argument3_type;
+      public:
+        PortPtrT1 port1_;
+        PortPtrT2 port2_;
+        PortPtrT3 port3_;
+        boost::function3<result_type, argument1_type, argument2_type, argument3_type>  call_;
+        chord3(bitmap<max_size> &m, joint_base* a, PortPtrT1 p1, PortPtrT2 p2, PortPtrT3 p3, CallT c, int pri) : 
+          chord_common<max_size>(m,pri), 
+          chord_fire<result_type>(a,p1),
+          port1_(p1), port2_(p2), port3_(p3), call_(c) {}
+        void capture_arguments(boost::function0<result_type> &cb) {
+          cb = boost::bind(call_, top(port1_), top(port2_), top(port3_));
+          pop_top(port1_);
+          pop_top(port2_);
+          pop_top(port3_);
+        }      
+        bool fire(port *caller_port) {
+          return chord_fire<result_type>::fire_chord(caller_port);
+        }
+      };
+
+    }
+
+    //------ schedulers ------
+
+    enum schedule_policy {
+      schedule_first_match, //(first match will fire)
+      schedule_longest_match,  //(longest match will fire)
+      schedule_round_robin  //(round robin)
+    };
+    
+    namespace detail {
+
+      template <size_t max_size>
+      struct sched_data_base {
+        port *port_;
+        bitmap<max_size> mask_;
+        sched_data_base() :
+          port_(0), mask_(0) {
+        }
+      };
+        
+      //basic schedulers
+      template <size_t max_size>
+      struct sched_data : public sched_data_base<max_size> {
+        typedef chord_common<max_size> chord_type;
+        std::vector<chord_type*> chords_; //chords this port participates in
+        sched_data() : sched_data_base<max_size>() {}
+        std::vector<chord_type*>  get_chords() { return chords_; }
+        void port_add_chord(chord_type *c, int) {
+          chords_.push_back(c);
+        }
+        void port_del_chord(chord_type *c, int p, bool can_detach) {
+          typename std::vector<chord_type*>::iterator iter;
+          if ((iter = std::find(chords_.begin(), chords_.end(), c)) != chords_.end())
+            chords_.erase(iter);
+          if(chords_.size() == 0) {
+            this->port_->reset();
+            if (can_detach) this->port_->detach_joint();
+            this->port_ = 0;
+          }
+        }
+      };
+    
+        
+      //simple priority based schedulers
+      template <size_t max_size>
+      struct sched_pri_data : public sched_data_base<max_size> {
+        typedef chord_common<max_size> chord_type;
+        std::vector<std::vector<chord_type*> > chords_; //chords this port participates in
+        sched_pri_data() : sched_data_base<max_size>() {
+        }
+        std::vector<chord_type*> get_chords() {
+          std::vector<chord_type*> chds;
+          for(size_t i=0; i<chords_.size(); i++)
+            for(size_t j=0; j<chords_[i].size(); j++)
+              chds.push_back(chords_[i][j]);
+          return chds;
+        }
+        void port_add_chord(chord_type *c, int priority) {
+          if(chords_.size() < ((size_t)priority+1))
+            for(size_t i=chords_.size(); i<((size_t)priority+1); i++) {
+              chords_.push_back(std::vector<chord_type*>());
+            }
+          chords_[priority].push_back(c);
+        }
+        void port_del_chord(chord_type *c, int priority, bool can_detach) {
+          typename std::vector<chord_type*>::iterator iter;
+          if ((iter = std::find(chords_[priority].begin(), chords_[priority].end(), c)) != chords_[priority].end())
+            chords_[priority].erase(iter);
+          bool bound = false;
+          for(size_t i=0; i<chords_.size() && !bound; i++)
+            if(chords_[i].size() > 0) bound = true;
+          if (!bound) {
+            this->port_->reset();
+            if (can_detach) this->port_->detach_joint();
+            this->port_ = 0;
+          }
+        }
+      };
+
+    }
+
+    template <size_t max_size>
+    struct sched_first_match : public detail::sched_data<max_size> {
+      typedef detail::bitmap<max_size> bitmap_t;
+      typedef detail::chord_common<max_size> chord_type;
+      enum { policy = schedule_first_match };
+      chord_type * scan_chords(bitmap_t status_) {
+        chord_type *chord_ready= 0;
+        for (size_t i=0; i<this->chords_.size() && chord_ready == 0; i++) {
+          if (status_.match(this->chords_[i]->mask_))
+            chord_ready = this->chords_[i];
+        }
+        return chord_ready;
+      }
+    };
+    
+    template <size_t max_size>
+    struct sched_longest_match : public detail::sched_data<max_size> {
+      enum { policy = schedule_longest_match };
+      typedef detail::bitmap<max_size> bitmap_t;
+      typedef detail::chord_common<max_size> chord_type;
+      chord_type * scan_chords(bitmap_t status_) {
+        chord_type *chord_ready= 0;
+        size_t chord_size = 0;
+        for(size_t i=0; i<this->chords_.size(); i++) {
+          if(this->chords_[i]->num_ports_ > chord_size && status_.match(this->chords_[i]->mask_)) {
+            chord_ready = this->chords_[i];
+            chord_size = chord_ready->num_ports_;
+          }     
+        }
+        return chord_ready;
+      }
+    };
+    
+    template <size_t max_size>
+    struct sched_round_robin : public detail::sched_data<max_size> {
+      enum { policy = schedule_round_robin };
+      typedef detail::bitmap<max_size> bitmap_t;
+      typedef detail::chord_common<max_size> chord_type;
+      int last_chord_fired_;
+      sched_round_robin() : detail::sched_data<max_size>(), last_chord_fired_(-1) {}
+      chord_type * scan_chords(bitmap_t status_) {
+        chord_type *chord_ready= 0;
+        size_t min = last_chord_fired_+1;
+        for(size_t j=min; j<this->chords_.size() && chord_ready == 0; j++) {
+          if(status_.match(this->chords_[j]->mask_)) {
+            chord_ready = this->chords_[j];
+            last_chord_fired_ = (int)j;
+          }
+        }
+        if (chord_ready == 0 && min > 0) 
+          for(size_t j=0; j<min && chord_ready == 0; j++) {
+            if(status_.match(this->chords_[j]->mask_)) {
+              chord_ready = this->chords_[j];
+              last_chord_fired_ = (int)j;
+            }
+          }
+        return chord_ready;
+      }
+    };
+    
+    template <size_t max_size>
+    struct sched_pri_first_match : public detail::sched_pri_data<max_size> {
+      enum { policy = schedule_first_match };
+      typedef detail::bitmap<max_size> bitmap_t;
+      typedef detail::chord_common<max_size> chord_type;
+      chord_type * scan_chords(bitmap_t status_) {
+        chord_type *chord_ready= 0;
+        for(size_t i=0; i<this->chords_.size() && chord_ready == 0; i++) 
+          for(size_t j=0; j<this->chords_[i].size() && chord_ready == 0; j++) {
+            if(status_.match(this->chords_[i][j]->mask_))
+              chord_ready = this->chords_[i][j];
+          }
+        return chord_ready;
+      }
+    };
+    
+    template <size_t max_size>
+    struct sched_pri_longest_match : public detail::sched_pri_data<max_size> {
+      enum { policy = schedule_longest_match };
+      typedef detail::bitmap<max_size> bitmap_t;
+      typedef detail::chord_common<max_size> chord_type;
+      chord_type * scan_chords(bitmap_t status_) {
+        chord_type *chord_ready= 0;
+        for(size_t i=0; i<this->chords_.size() && chord_ready == 0; i++) {
+          size_t chord_size = 0;
+          for(size_t j=0; j<this->chords_[i].size(); j++) {
+            if(this->chords_[i][j]->num_ports_ > chord_size) {
+              if(status_.match(this->chords_[i][j]->mask_)) {
+                chord_ready = this->chords_[i][j];
+                chord_size = chord_ready->num_ports_;
+              }
+            }     
+          }
+        }
+        return chord_ready;
+      }
+    };
+    
+    template <size_t max_size>
+    struct sched_pri_round_robin : public detail::sched_pri_data<max_size> {
+      enum { policy = schedule_round_robin };
+      typedef detail::bitmap<max_size> bitmap_t;
+      typedef detail::chord_common<max_size> chord_type;
+      std::vector<int> last_chord_fired_; //for roundrobin dispatching
+      sched_pri_round_robin() : detail::sched_pri_data<max_size>() {
+        last_chord_fired_.push_back(-1);
+      }
+      void port_add_chord(chord_type *c, int priority) {
+        if(this->chords_.size() < ((size_t)priority+1))
+          for(size_t i=this->chords_.size(); i<((size_t)priority+1); i++) {
+            this->chords_.push_back(std::vector<chord_type*>());
+            last_chord_fired_.push_back(-1);
+          }
+        this->chords_[priority].push_back(c);
+      }
+      void port_del_chord(chord_type *c, int priority, bool can_detach) {
+        typename std::vector<chord_type*>::iterator iter;
+        if ((iter = std::find(this->chords_[priority].begin(), this->chords_[priority].end(), c)) != this->chords_[priority].end()) {
+          this->chords_[priority].erase(iter);
+          last_chord_fired_[priority] = -1;
+        }
+        bool bound = false;
+        for(size_t i=0; i<this->chords_.size() && !bound; i++)
+          if(this->chords_[i].size() > 0) bound = true;
+        if (!bound) {
+          this->port_->reset();
+          if(can_detach) this->port_->detach_joint();
+          this->port_ = 0;
+        }
+      }
+      chord_type * scan_chords(bitmap_t status_) {
+        chord_type *chord_ready= 0;
+        for(size_t i=0; i<this->chords_.size() && chord_ready == 0; i++) 
+          if(this->chords_[i].size()>0) {
+            size_t min = last_chord_fired_[i]+1;
+            for(size_t j=min; j<this->chords_[i].size() && chord_ready == 0; j++) {
+              if(status_.match(this->chords_[i][j]->mask_)) {
+                chord_ready = this->chords_[i][j];
+                last_chord_fired_[i] = (int)j;
+              }
+            }
+            if (chord_ready == 0 && min > 0) 
+              for(size_t j=0; j<min && chord_ready == 0; j++) {
+                if(status_.match(this->chords_[i][j]->mask_)) {
+                  chord_ready = this->chords_[i][j];
+                  last_chord_fired_[i] = (int)j;
+                }
+              }
+          }
+        return chord_ready;
+      }
+    };
+
+    //------ joint_impl definition ------
+    
+    namespace detail {
+
+      template <
+        template <size_t> class scheduler=sched_first_match, 
+        size_t max_size=32
+        >
+      class joint_impl : public joint_base, private boost::noncopyable {
+        template <size_t> friend class chord_common;
+        friend class port;
+
+      public:
+        typedef bitmap<max_size> bitmap_t;
+        typedef scheduler<max_size> sched_type;
+        typedef typename joint_base::callable callable;
+        typedef function1<void, typename joint_base::callable> spawn_type;
+        typedef chord_common<max_size> chord_type;
+        typedef sched_type port_data;
+
+      private:
+
+        spawn_type spawn_;
+        bitmap_t status_; //bitmap marking which port is active
+        int heartbeat_;
+        std::vector<port_data> ports_; //all unique ports in this joint, joint not owning
+        //them and will not destroy them, joint will destroy port_data
+        std::vector<boost::shared_ptr<chord_type> > chords_; //joint owns chords_ and will destroy them
+
+      public:
+        joint_impl(spawn_type s = 0, 
+                   int hb = 0, //forever
+                   const char *name = 0) : 
+          joint_base(name), spawn_(s), status_(0), heartbeat_(hb) {}
+
+        ~joint_impl() {
+          log.msg("joint is being destructed...");
+          reset();
+          log.msg("joint is dead now...");
+        }
+
+        void reset() {
+          log.msg("joint reset...");
+          boost::mutex::scoped_lock lock(mutex_);
+          internal_reset();
+        }
+
+        bool has_spawn(void) { return spawn_ != 0; }
+        void spawn(callable c) {
+          spawn_(c);
+        }
+
+        schedule_policy my_scheduler(void) { return static_cast<schedule_policy>(sched_type::policy); }
+      
+        bool check_heartbeat() {
+          if (heartbeat_ > 0) {
+            heartbeat_--;
+            if (heartbeat_ == 0) {
+              log.msg("heartbeat expires, joint reset...");
+              internal_reset();
+              return true;
+            }
+          }
+          return false;
+        }
+
+      private:
+        void internal_reset() {
+          chords_remove(); //remove all remaining chords
+          ports_.clear();
+          chords_.clear();
+          status_ = bitmap_t(0);
+        }
+
+        void port_remove(int ind) {
+          boost::mutex::scoped_lock lock(mutex_);
+          log.stream() << "port_remove: port [" << ind << "]" << logger::endl;
+          port_data &pd(ports_[ind]);
+          std::vector<chord_type*> chds = pd.get_chords();
+          for(size_t i=0; i<chds.size(); i++) {
+            chord_type *chd = chds[i];
+            for(size_t j=0; j<ports_.size(); j++) {
+              if(ports_[j].port_ != 0 && chd->mask_.test(ports_[j].mask_)) {
+                if((size_t)ind == j)
+                  ports_[j].port_del_chord(chd, chd->priority_, false);
+                else
+                  ports_[j].port_del_chord(chd, chd->priority_, true);
+              }
+            }
+            del_chord(chd->mask_);
+          }
+        }
+
+        void chords_remove() {
+          typename std::vector<boost::shared_ptr<chord_type> >::iterator iter;
+          for(iter = chords_.begin(); iter != chords_.end(); ) {
+            boost::shared_ptr<chord_type> chd = *iter;
+            for(size_t j=0; j<ports_.size(); j++) {
+              if (ports_[j].port_ != 0 && chd->mask_.test(ports_[j].mask_)) {
+                ports_[j].port_del_chord(chd.get(), chd->priority_, true);
+              }
+            }
+            log.msg("one chord removed");
+            iter = chords_.erase(iter);
+          }
+        }
+
+        //ports call this to notify that a new msg comes
+        //return: true - chord fired, false - chord not fired
+        bool port_invoked(int ind) {
+          port_data &pd(ports_[ind]);
+          pd.port_->num_pending_++;
+          if(status_.test(pd.mask_)) {
+            //already set
+            //log.msg("port_invoked: a port add more pending calls");
+            log.stream() << "port_invoked: a port [" << ind << "] add more pending calls" << logger::endl;
+            //do nothing
+          } else {
+            status_.set(pd.mask_);
+            //log.msg("port_invoked: a empty port get a call");
+            log.stream() << "port_invoked: a empty port [" << ind << "] get a call" << logger::endl;
+            chord_type *c = pd.scan_chords(status_); //find chord to fire based on dispatch_policy
+            if(c != 0) {
+              //update msg arrival status first, mark msgs to be consumed as
+              //"unavailable", kinds of "reserve" these msgs
+              //so that thread-scheduling will not interfere: another thread may come in
+              //between try to take some of these msgs
+              update_status(c->mask_);
+              return c->fire(pd.port_);
+            }
+          }
+          return false; //no chord fired
+        }
+
+        //one msg will be taken from each port in mask
+        //query these ports to see if any msg remaininf and
+        //if its bit in status_ should be flipped
+        void update_status(bitmap_t &chord_mask) {
+          for(size_t i=0; i<ports_.size(); i++)
+            if(chord_mask.test(ports_[i].mask_)) {
+              ports_[i].port_->num_pending_--;
+              if(ports_[i].port_->num_pending_ == 0)
+                status_.clear(ports_[i].mask_);
+            }
+        }
+
+        void port_revoked(int ind) {
+          port_data &pd(ports_[ind]);
+          pd.port_->num_pending_--;
+          if(pd.port_->num_pending_ == 0)
+            status_.clear(pd.mask_);
+        }
+
+        //
+        // utils methods for creating chords
+        //
+        template <typename PortT>
+        void add_port(PortT *pp, bitmap_t &bmap) {
+          port_data pd;
+          int ind = -1;
+          int empty = -1;
+          for(size_t i=0; i<ports_.size() && ind == -1;i++)
+            if(pp == ports_[i].port_) {
+              ind = (int)i;
+              pd = ports_[i];
+            }
+            else if (ports_[i].port_ == 0 && empty == -1)
+              empty = i;
+          if (ind == -1) {
+            if (empty >= 0) ind = empty;
+            else ind = (int)ports_.size();
+            if((size_t)ind >= max_size) {
+              log.msg("too_many_ports_exception thrown");
+              throw too_many_ports_exception();
+            }
+            pd.port_ = pp;
+            pd.mask_ = bitmap_t(1<<ind);
+            ports_.push_back(pd);
+            pp->index_ = ind;
+            pp->num_pending_ = 0;
+          }
+          bmap.set(pd.mask_);
+        }
+        template <typename PortT>
+        void add_port(std::vector<PortT*> &vp, bitmap_t &bmap) {
+          for(size_t i=0; i<vp.size(); i++) 
+            add_port(vp[i], bmap);
+        }
+        template <typename PortT>
+        void find_port(PortT *p, bitmap_t &bmap) {
+          bmap.set(ports_[p->index_].mask_);
+        }
+        template <typename PortT>
+        void find_port(std::vector<PortT*> &vp, bitmap_t &bmap) {
+          for(size_t i=0; i<vp.size(); i++) 
+            find_port(vp[i], bmap);
+        }
+        template <typename PortT>
+        void port_add_chord(PortT *p, chord_type *c, int priority) {
+          port_data &pd(ports_[p->index_]);
+          pd.port_add_chord(c, priority);
+        }
+        template <typename PortT>
+        void port_add_chord(std::vector<PortT*> &vp, chord_type *c, int priority) {
+          for(size_t i=0; i<vp.size(); i++)
+            port_add_chord(vp[i], c, priority);
+        }
+        template <typename PortT>
+        void port_del_chord(PortT *p, chord_type *c, int priority) {
+          port_data &pd(ports_[p->index_]);
+          pd.port_del_chord(c, priority, true);
+        }
+        template <typename PortT>
+        void port_del_chord(std::vector<PortT*> &vp, chord_type *c, int priority) {
+          for(size_t i=0; i<vp.size(); i++)
+            port_del_chord(vp[i], c, priority);
+        }
+        bool find_chord(bitmap_t &bmap, boost::shared_ptr<chord_type> &cd) {
+          for(size_t i=0; i<chords_.size(); i++) {
+            if (chords_[i]->mask_ == bmap) {
+              cd = chords_[i];
+              return true;
+            }
+          }
+          return false;
+        }
+        
+        void del_chord(bitmap_t &bmap) {
+          log.msg("del_chord...");
+          typename std::vector<boost::shared_ptr<chord_type> >::iterator iter;
+          for(iter = chords_.begin(); iter != chords_.end(); iter++) {
+            if ((*iter)->mask_ == bmap) {
+              chords_.erase(iter);
+              return;
+            }
+          }
+        }
+        
+        bool del_chord(bitmap_t &bmap, boost::shared_ptr<chord_type> &cd) {
+          typename std::vector<boost::shared_ptr<chord_type> >::iterator iter;
+          for(iter = chords_.begin(); iter != chords_.end(); iter++) {
+            if ((*iter)->mask_ == bmap) {
+              cd = (*iter);
+              chords_.erase(iter);
+              return true;
+            }
+          }
+          return false;
+        }
+        void chord_data_validation(bitmap_t &bmap) {
+          if (static_cast<schedule_policy>(sched_type::policy) == schedule_first_match) {
+            //check_hidden_chord(bitmap_t &bmap)
+            for(size_t i=0; i<chords_.size(); i++) {
+              if (chords_[i]->mask_.match(bmap) || bmap.match(chords_[i]->mask_)) 
+                throw hidden_chord_exception();
+            }
+          }
+          boost::shared_ptr<chord_type> cd;
+          if(find_chord(bmap, cd))
+            throw hidden_chord_exception();
+        }
+
+      public:
+
+        //
+        // ***** "factory" methods to create chords ******
+        //
+        //--- chord with 1 port ---
+        template <typename PortPtrT, typename CallT>
+        void chord(PortPtrT p, CallT c, int priority) {
+          bitmap_t bmap;
+          add_port(p, bmap);
+          is_synch_port(p); // check against vector of synch
+          chord_data_validation(bmap);
+          boost::shared_ptr<chord_type> cd(new chord1<max_size,PortPtrT,CallT>(bmap, this, p, c, priority));
+          chords_.push_back(cd);
+          port_add_chord(p,cd.get(),priority);
+        }
+
+        template <typename PortPtrT>
+        void chord_remove(PortPtrT p) {
+          bitmap_t bmap;
+          find_port(p, bmap);
+          boost::shared_ptr<chord_type> cd;
+          if(!del_chord(bmap,cd))
+            throw chord_remove_exception();
+          port_del_chord(p, cd.get(), cd->priority_);
+        }
+
+        template <typename PortPtrT, typename CallT>
+        void chord_override(PortPtrT p, CallT c, int priority) {
+          bitmap_t bmap;
+          find_port(p, bmap); 
+          boost::shared_ptr<chord_type> cd;
+          if(find_chord(bmap, cd)) { //override chord
+            chord1<max_size,PortPtrT,CallT> *cdp = static_cast<chord1<max_size,PortPtrT,CallT>*>(cd.get());
+            cdp->call_ = c;
+            if (cdp->priority_ != priority) {
+              port_del_chord(p, cdp, cdp->priority_);
+              port_add_chord(p, cdp, priority);
+              cdp->priority_ = priority;
+            }
+          }
+          else
+            throw chord_override_exception();
+        }
+
+        // ---- chord with 2 ports ----
+        template <typename PortPtrT1, typename PortPtrT2, typename CallT>
+        void chord(PortPtrT1 p1, PortPtrT2 p2, CallT c, int priority)
+        {
+          log.msg("joint_impl::chord 2 ports");
+          bitmap_t bmap;
+          is_synch_port(p1); // check against vector of synch
+          if (is_synch_port(p2))
+            throw synch_not_1st_exception();
+          add_port(p1, bmap);
+          add_port(p2, bmap);
+          chord_data_validation(bmap);
+          boost::shared_ptr<chord_type> cd(new chord2<max_size,PortPtrT1,PortPtrT2,CallT>(bmap, this, p1, p2, c, priority));
+          chords_.push_back(cd);
+          port_add_chord(p1,cd.get(),priority);
+          port_add_chord(p2,cd.get(),priority);
+        }
+
+        template <typename PortPtrT1, typename PortPtrT2>
+        void chord_remove(PortPtrT1 p1, PortPtrT2 p2) {
+          bitmap_t bmap;
+          find_port(p1, bmap); 
+          find_port(p2, bmap);
+          boost::shared_ptr<chord_type> cd;
+          if(!del_chord(bmap, cd))
+            throw chord_remove_exception();
+          port_del_chord(p1, cd.get(), cd->priority_);
+          port_del_chord(p2, cd.get(), cd->priority_);
+        }
+
+        template <typename PortPtrT1, typename PortPtrT2, typename CallT>
+        void chord_override(PortPtrT1 p1, PortPtrT2 p2, CallT c, int priority) {
+          bitmap_t bmap;
+          find_port(p1, bmap);
+          find_port(p2, bmap);
+          boost::shared_ptr<chord_type> cd;
+          if(find_chord(bmap, cd)) { //override chord
+            chord2<max_size,PortPtrT1,PortPtrT2,CallT>* cdp = static_cast<chord2<max_size,PortPtrT1,PortPtrT2,CallT>*>(cd.get());
+            cdp->call_ = c;
+            if (cdp->priority_ != priority) {
+              port_del_chord(p1, cdp, cdp->priority_);
+              port_del_chord(p2, cdp, cdp->priority_);
+              port_add_chord(p1, cdp, priority);
+              port_add_chord(p2, cdp, priority);
+              cdp->priority_ = priority;
+            }
+          }
+          else
+            throw chord_override_exception();
+        }
+
+        //---- chord with 3 ports ---
+
+        template <typename PortPtrT1, typename PortPtrT2, typename PortPtrT3, typename CallT>
+        void chord(PortPtrT1 p1, PortPtrT2 p2, PortPtrT3 p3, CallT c, int priority)
+        {
+          bitmap_t bmap;
+          is_synch_port(p1); // check against vector of synch
+          if (is_synch_port(p2) || is_synch_port(p3))
+            throw synch_not_1st_exception();
+          add_port(p1, bmap);
+          add_port(p2, bmap);
+          add_port(p3, bmap);
+          chord_data_validation(bmap);
+          boost::shared_ptr<chord_type> cd(new chord3<max_size,PortPtrT1,PortPtrT2,PortPtrT3,CallT>(bmap, this, p1, p2, p3, c, priority));
+          chords_.push_back(cd);
+          port_add_chord(p1,cd.get(),priority);
+          port_add_chord(p2,cd.get(),priority);
+          port_add_chord(p3,cd.get(),priority);
+        }
+
+        template <typename PortPtrT1, typename PortPtrT2, typename PortPtrT3>
+        void chord_remove(PortPtrT1 &p1, PortPtrT2 &p2, PortPtrT3 &p3) {
+          bitmap_t bmap;
+          find_port(p1, bmap);
+          find_port(p2, bmap);
+          find_port(p3, bmap);
+          boost::shared_ptr<chord_type> cd;
+          if(!del_chord(bmap, cd))
+            throw chord_remove_exception();
+          port_del_chord(p1, cd.get(), cd->priority_);
+          port_del_chord(p2, cd.get(), cd->priority_);
+          port_del_chord(p3, cd.get(), cd->priority_);
+        }
+
+        template <typename PortPtrT1, typename PortPtrT2, typename PortPtrT3, typename CallT>
+        void chord_override(PortPtrT1 p1, PortPtrT2 p2, PortPtrT3 p3, CallT c, int priority) {
+          bitmap_t bmap;
+          find_port(p1, bmap);
+          find_port(p2, bmap);
+          find_port(p3, bmap);       
+          boost::shared_ptr<chord_type> cd;
+          if(find_chord(bmap, cd)) { //override chord
+            chord3<max_size,PortPtrT1,PortPtrT2,PortPtrT3,CallT>* cdp = static_cast<chord3<max_size,PortPtrT1,PortPtrT2,PortPtrT3,CallT>*>(cd.get());
+            cdp->call_ = c;
+            if (cdp->priority_ != priority) {
+              port_del_chord(p1, cdp, cdp->priority_);
+              port_del_chord(p2, cdp, cdp->priority_);
+              port_del_chord(p3, cdp, cdp->priority_);
+              port_add_chord(p1, cdp, priority);
+              port_add_chord(p2, cdp, priority);
+              port_add_chord(p3, cdp, priority);
+              cdp->priority_ = priority;
+            }
+          }
+          else
+            throw chord_override_exception();
+        }
+
+      };
+
+    }
+
+    template <
+      template <size_t> class scheduler=sched_first_match, 
+      size_t max_size=32
+      >
+    class joint_t {
+    public:
+      typedef detail::joint_impl<scheduler, max_size> ImplT;
+      typedef typename ImplT::sched_type sched_type;
+      typedef typename ImplT::callable callable;
+      typedef typename ImplT::spawn_type spawn_type;
+
+    private:
+      boost::shared_ptr<ImplT> pimpl_;
+
+      template <typename PortT>
+      typename PortT::ImplT* get_impl_ptr(PortT &p) {
+        if (p.pimpl_ == 0 || p.pimpl_->joint_ == 0) {
+          log->msg("a new port impl is created");
+          p.pimpl_.reset(new typename PortT::ImplT(p.max_size_, pimpl_));
+        }
+        if(p.pimpl_->joint_ != 0 && p.pimpl_->joint_ != pimpl_) 
+          throw double_association_exception();
+        return p.pimpl_.get();
+      }
+
+      template <typename PortT>
+      std::vector<typename PortT::ImplT*> get_impl_ptr(std::vector<PortT> &vp) {
+        if (vp[0].pimpl_ == 0 || vp[0].pimpl_->joint_ == 0) {
+          for(size_t i=0; i<vp.size(); i++)
+            vp[i].pimpl_.reset(new typename PortT::ImplT(vp[i].max_size_, pimpl_));
+        }
+        if(vp[0].pimpl_->joint_ != 0 && vp[0].pimpl_->joint_ != pimpl_) 
+          throw double_association_exception();
+        std::vector<typename PortT::ImplT*> vimpl;
+        for(size_t i=0; i<vp.size(); i++)
+          vimpl.push_back(vp[i].pimpl_.get());
+        return vimpl;
+      }
+
+      template <typename PortT, typename exception_type>
+      typename PortT::ImplT* check_port(PortT &p) {
+        if(p.pimpl_ == 0 || p.pimpl_->joint_ == 0 || p.pimpl_->joint_ != pimpl_) 
+          throw new exception_type();
+        return p.pimpl_.get();
+      }
+
+      template <typename PortT, typename exception_type>
+      std::vector<typename PortT::ImplT*> check_port(std::vector<PortT> &vp) {
+        std::vector<typename PortT::ImplT*> vimpl;
+        for(size_t i=0; i<vp.size(); i++) 
+          vimpl.push_back(check_port<PortT, exception_type>(vp[i]));
+        return vimpl;
+      }
+
+    public:
+      logger *log;
+
+      joint_t(spawn_type s = 0, 
+              int hb = 0, //forever
+              const char *name = 0) : pimpl_(new ImplT(s, hb, name)), log(&(pimpl_->log)) {
+        log->msg("joint_t constructor");
+      }
+
+      schedule_policy my_scheduler(void) { 
+        return pimpl_->my_scheduler(); 
+      }
+
+      void reset() { pimpl_->reset(); }
+
+      //
+      // ***** "factory" methods to create chords ******
+      //
+      //--- chord with 1 port ---
+      template <typename PortT, typename CallT>
+      joint_t& chord(PortT &p, CallT c, int priority=0) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT>::result pp = get_impl_ptr(p);
+        pimpl_->chord(pp, c, priority);
+        return *this;
+      }
+
+      template <typename PortT>
+      joint_t& chord_remove(PortT &p) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT>::result pp = check_port<PortT,chord_remove_exception>(p);
+        pimpl_->chord_remove(pp);
+        return *this;
+      }
+
+      template <typename PortT, typename CallT>
+      joint_t& chord_override(PortT &p, CallT c, int priority=0) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT>::result pp = check_port<PortT,chord_override_exception>(p);
+        pimpl_->chord_override(pp, c, priority);
+        return *this;
+      }
+
+      //wrappers for pointers to member methods as chord body
+      template <typename PortT, typename ActorT, typename R, typename ArgT>
+      joint_t& chord(PortT &p, 
+                 R (ActorT::*c)(ArgT), 
+                 int priority=0) {
+        return chord(p, boost::bind(c, static_cast<ActorT*>(this), ::_1), priority);
+      }
+      template <typename PortT, typename ActorT, typename R, typename ArgT>
+      joint_t& chord_override(PortT &p, 
+                          R (ActorT::*c)(ArgT), 
+                          int priority=0) {
+        return chord_override(p, boost::bind(c, static_cast<ActorT*>(this), ::_1), priority);
+      }
+
+
+      // ---- chord with 2 ports ----
+      template <typename PortT1, typename PortT2, typename CallT>
+      joint_t& chord(PortT1 &p1, PortT2 &p2, CallT c, int priority=0)
+      {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT1>::result pp1 = get_impl_ptr(p1);
+        typename detail::impl_ptr<PortT2>::result pp2 = get_impl_ptr(p2);
+        pimpl_->chord(pp1, pp2, c, priority);
+        return *this;
+      }
+
+      template <typename PortT1, typename PortT2>
+      joint_t& chord_remove(PortT1 &p1, PortT2 &p2) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_remove_exception>(p1);
+        typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_remove_exception>(p2);
+        pimpl_->chord_remove(pp1, pp2);
+        return *this;
+      }
+
+      template <typename PortT1, typename PortT2, typename CallT>
+      joint_t& chord_override(PortT1 &p1, PortT2 &p2, CallT c, int priority=0) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_override_exception>(p1);
+        typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_override_exception>(p2);
+        pimpl_->chord_override(pp1, pp2, c, priority);
+        return *this;
+      }
+
+      //wrappers for pointers to member methods as chord body
+      template <typename PortT1, typename PortT2, typename ActorT, typename R, typename ArgT1, typename ArgT2>
+      joint_t& chord(PortT1 &p1, PortT2 &p2, 
+                 R (ActorT::*c)(ArgT1, ArgT2), int priority=0) {
+        return chord(p1, p2, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2), priority);
+      }
+      template <typename PortT1, typename PortT2, typename ActorT, typename R, typename ArgT1, typename ArgT2>
+      joint_t& chord_override(PortT1 &p1, PortT2 &p2, 
+                          R (ActorT::*c)(ArgT1, ArgT2), int priority=0) {
+        return chord_override(p1, p2, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2), priority);
+      }
+
+      //---- chord with 3 ports ---
+
+      template <typename PortT1, typename PortT2, typename PortT3, typename CallT>
+      joint_t& chord(PortT1 &p1, PortT2 &p2, PortT3 &p3, CallT c, int priority=0)
+      {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT1>::result pp1 = get_impl_ptr(p1);
+        typename detail::impl_ptr<PortT2>::result pp2 = get_impl_ptr(p2);
+        typename detail::impl_ptr<PortT3>::result pp3 = get_impl_ptr(p3);
+        pimpl_->chord(pp1, pp2, pp3, c, priority);
+        return *this;
+      }
+
+      template <typename PortT1, typename PortT2, typename PortT3>
+      joint_t& chord_remove(PortT1 &p1, PortT2 &p2, PortT3 &p3) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_remove_exception>(p1);
+        typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_remove_exception>(p2);
+        typename detail::impl_ptr<PortT3>::result pp3 = check_port<PortT3,chord_remove_exception>(p3);
+        pimpl_->chord_remove(pp1, pp2, pp3);
+        return *this;
+      }
+
+      template <typename PortT1, typename PortT2, typename PortT3, typename CallT>
+      joint_t& chord_override(PortT1 &p1, PortT2 &p2, PortT3 &p3, CallT c, int priority=0) {
+        boost::mutex::scoped_lock lock(pimpl_->mutex_);
+        typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_override_exception>(p1);
+        typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_override_exception>(p2);
+        typename detail::impl_ptr<PortT3>::result pp3 = check_port<PortT3,chord_override_exception>(p3);
+        pimpl_->chord_override(pp1, pp2, pp3, c, priority);
+        return *this;
+      }
+
+      //wrappers for pointers to member methods as chord body
+      template <typename PortT1, typename PortT2, typename PortT3, typename ActorT, typename R, typename ArgT1, typename ArgT2, typename ArgT3>
+      joint_t& chord(PortT1 &p1, PortT2 &p2, PortT3 &p3, 
+                 R (ActorT::*c)(ArgT1, ArgT2, ArgT3), int priority=0) {
+        return chord(p1, p2, p3, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2, ::_3), priority);
+      }
+      template <typename PortT1, typename PortT2, typename PortT3, typename ActorT, typename R, typename ArgT1, typename ArgT2, typename ArgT3>
+      joint_t& chord_override(PortT1 &p1, PortT2 &p2, PortT3 &p3, 
+                          R (ActorT::*c)(ArgT1, ArgT2, ArgT3), int priority=0) {
+        return chord_override(p1, p2, p3, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2, ::_3)), priority;
+      }
+
+    };
+
+    class joint : public joint_t<> {
+    public:
+      joint(spawn_type s = 0, 
+            int hb = 0,
+            const char *name = 0) : 
+        joint_t<>(s,hb,name) {}
+    };
+
+    //utility function to define large/wide joint
+    template <
+      template <size_t> class scheduler, 
+      size_t max_size
+      >
+    joint_t<scheduler, max_size> joins_t(joint::spawn_type s = 0, int hb = 0, const char *name = 0) {
+      return joint_t<scheduler, max_size>(s,hb,name);
+    }
+
+    //utility function to define small/default joint
+    joint joins(joint::spawn_type s = 0, int hb = 0, const char *name = 0) {
+      return joint(s,hb,name);
+    }
+
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/base/port.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/port.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,706 @@
+//
+// Boost/join/port.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_PORT_HPP
+#define BOOST_JOIN_PORT_HPP
+
+#include <exception>
+#include <stdexcept>
+#include <deque>
+#include <iostream>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/condition.hpp> 
+#include <boost/join/base/join_base.hpp>
+#include <boost/join/base/exceptions.hpp>
+
+namespace boost {
+  namespace join {
+
+    struct void_t {};
+
+    namespace detail {
+
+      class joint_base;
+      template <typename ResT> class chord_base;
+
+      class async_port : public port {
+      public:
+        async_port(shared_ptr<joint_base> j) : port(port::async, j), chord_fired_(0) {}
+        void add_chord_fired(chord_base<void> *c) { chord_fired_ = c; }
+      protected:
+        chord_base<void> *chord_fired_;
+      };
+
+      //QueT supports push_back()/pop_front()/front()
+      template <typename ArgT, typename QueT> 
+      class async_impl1 : public async_port, private boost::noncopyable {
+      public:
+        typedef ArgT argument_type;
+        typedef void result_type;
+
+        async_impl1(size_t sz, shared_ptr<joint_base> j) : async_port(j), max_size_(sz), arg1_(0) {//sz==0 means unlimited
+        } 
+
+        void operator()(ArgT t) {
+          boost::shared_ptr<joint_base> my_joint(joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          my_joint->log.msg("async_impl1::operator(arg) enter");
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          if(test_chord_fire()) {  //notify joint about a new msg
+            arg1_ = &t; //setup my arg
+            boost::function0<void> cb;
+            chord_fired_->capture_arguments(cb);
+            chord_fired_ = 0;
+            my_joint->log.msg("a new async task spawned");
+            my_joint->check_heartbeat();
+            lock.unlock();
+            my_joint->spawn(cb); //"spawn" (from executor) has its own lock protection
+            return;
+          }
+          else {
+            my_joint->log.msg("async_impl1::operator(arg) arg queued");
+            arg_.push_back(t);
+          }
+        }
+        void put(ArgT t) {
+          operator()(t);
+        }
+        ArgT &top(void) {
+          if(arg1_ != 0)
+            return *arg1_;
+          else
+            return arg_.front();
+        }
+        void pop_top(void) {
+          if (arg1_ != 0)
+            arg1_ = 0;
+          else
+            arg_.pop_front();
+        }
+        void reset(void) {
+          port::reset();
+          arg_.clear();
+          arg1_ = 0;
+        }
+      private:
+        size_t max_size_;
+        QueT arg_;
+        ArgT *arg1_;
+      };
+
+      class async_impl0 : public async_port, private boost::noncopyable {
+      public:
+        typedef void_t argument_type;
+        typedef void result_type;
+
+        async_impl0(size_t sz, shared_ptr<joint_base> j) : async_port(j), max_size_(sz) {
+        } 
+
+        void operator()(void) {
+          boost::shared_ptr<joint_base> my_joint(joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          my_joint->log.msg("async_impl0::operator(void) enter");
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+        
+          if(test_chord_fire()) {  
+            boost::function0<void> cb;
+            chord_fired_->capture_arguments(cb);
+            chord_fired_ = 0;
+            my_joint->log.msg("a new async task spawned");
+            my_joint->check_heartbeat();
+            lock.unlock();
+            my_joint->spawn(cb);
+            return;
+          }
+          my_joint->log.msg("async_impl0::operator(void) exit");
+        }
+        void put(void) {
+          operator()();
+        }
+        void pop_top(void) {
+        }
+        void_t top(void) {
+          return void_t();
+        }
+
+      private:
+        size_t max_size_;
+      };
+
+      template <typename ResT>
+      class synch_port : public port {
+      public:
+        synch_port(shared_ptr<joint_base> j) : port(port::synch, j), num_waiting_(0), reset_(false), chord_fired_(0) {}
+
+        void add_chord_fired(chord_base<ResT>* c) {
+          if (chord_fired_ == 0)
+            chord_fired_ = c;
+          else
+            chords_fired_.push_back(c);
+        }
+
+        //async call will call this to transfer control
+        void transfer(chord_base<ResT>* c) {
+          add_chord_fired(c);
+          if (num_waiting_ > 0)
+            cond_.notify_one();     
+          joint_->log.msg("transfer");
+        }
+        chord_base<ResT> * get_chord_fired(void) {
+          chord_base<ResT> *cd = 0;
+          if (chord_fired_ != 0) {
+            cd = chord_fired_;
+            chord_fired_ = 0;
+          }
+          else {
+            if (!chords_fired_.empty()) { //assigned to complete chord callback
+              cd = chords_fired_.front();
+              chords_fired_.pop_front();
+            } 
+          }
+          return cd;
+        }
+
+        void wait_fired(boost::mutex::scoped_lock &lock, chord_base<ResT> *&chd) {
+          num_waiting_++;
+          cond_.wait(lock);
+          num_waiting_--;
+          if (reset_) {
+            if (num_waiting_ == 0)
+              reset_ = false;
+            throw port_chord_reset_exception();
+          }
+          chd = get_chord_fired();
+        }
+
+        void timed_wait_fired(boost::mutex::scoped_lock &lock, chord_base<ResT> *&chd, const boost::xtime& timeout) {
+          num_waiting_++;
+          if(!cond_.timed_wait(lock, timeout) && ((chd = get_chord_fired()) == 0)) {
+            port_revoked();
+            num_waiting_--;
+            throw synch_time_out_exception();
+          }
+          num_waiting_--;
+          if (reset_) {
+            if (num_waiting_ == 0)
+              reset_ = false;
+            throw port_chord_reset_exception();
+          }
+          if (chd == 0)
+            chd = get_chord_fired();
+        }
+
+        void reset(void) {
+          port::reset();
+          chord_fired_ = 0;
+          chords_fired_.clear();
+          reset_ = true;
+          //tell all blocking threads to exit
+          if (num_waiting_ > 0)
+            cond_.notify_all();     
+          //joint_->log.msg("reset");
+        }
+
+      protected:
+        boost::condition cond_;
+        size_t num_waiting_;
+        bool reset_;
+        chord_base<ResT> *chord_fired_;
+        std::deque<chord_base<ResT>*> chords_fired_;
+      };
+
+      template <typename ResT, typename ArgT>
+      class synch_impl : public synch_port<ResT>, private boost::noncopyable {
+      public:
+        typedef ArgT argument_type;
+        typedef ResT result_type;
+
+        synch_impl(size_t sz, shared_ptr<joint_base> j) :  synch_port<ResT>(j), max_size_(sz), arg_(0) {//sz==0 means unlimited
+        } 
+
+        ResT operator()(ArgT t) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<ResT, ArgT>::operator(t) enter");
+
+          chord_base<ResT> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<resT,argT> wait");
+            this->wait_fired(lock, chd);
+          }
+          my_joint->log.msg("synch<resT,argT> fired");
+          arg_ = &t; //setup my arg
+          boost::function0<ResT> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          return cb();  //invoke callback in my thread
+        }
+
+        ResT put(ArgT t) {
+          return operator()(t);
+        }
+
+        ResT operator()(ArgT t, const boost::xtime& timeout) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<ResT, ArgT>::operator(t, timeout) enter");
+
+          chord_base<ResT> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<resT,argT> timed wait");
+            this->timed_wait_fired(lock, chd, timeout);
+          }
+          my_joint->log.msg("synch<resT,argT> fired");
+          arg_ = &t; //setup my arg
+          boost::function0<ResT> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          return cb();  //invoke callback in my thread
+        }
+
+        ResT put(ArgT t, const boost::xtime& w) {
+          return operator()(t, w);
+        }
+  
+        ArgT &top(void) {
+          return *arg_;
+        }
+        void pop_top(void) {
+        }
+      private:
+        size_t max_size_;
+        ArgT *arg_;
+      };
+
+
+      template <typename ArgT>
+      class synch_impl<void, ArgT> : public synch_port<void>, private boost::noncopyable {
+      public:
+        typedef ArgT argument_type;
+        typedef void result_type;
+
+        synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<void>(j), max_size_(sz), arg_(0) {//sz==0 means unlimited
+        } 
+
+        void operator()(ArgT t) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<void, ArgT>::operator(t) enter");
+
+          chord_base<void> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<void,argT> wait");
+            this->wait_fired(lock, chd);
+          }
+          my_joint->log.msg("synch<void, argT> fired");
+          arg_ = &t; //setup my arg
+          boost::function0<void> cb;
+          chd->capture_arguments(cb);
+          my_joint->log.msg("synch<void, argT> fired-bef-callback");
+          my_joint->check_heartbeat();
+          lock.unlock();
+          cb();  //invoke callback in my thread
+        }
+
+        void put(ArgT t) {
+          operator()(t);
+        }
+
+        void operator()(ArgT t, const boost::xtime& timeout) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<void, ArgT>::operator(t, timeout) enter");
+
+          chord_base<void> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<void,argT> timed wait");
+            this->timed_wait_fired(lock, chd, timeout);
+          }
+          my_joint->log.msg("synch<void, argT> fired");
+          arg_ = &t; //setup my arg
+          boost::function0<void> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          cb();  //invoke callback in my thread
+        }
+
+        void put(ArgT t, const boost::xtime& w) {
+          operator()(t, w);
+        }
+    
+        ArgT &top(void) {
+          return *arg_;
+        }
+        void pop_top(void) {
+        }
+      private:
+        size_t max_size_;
+        ArgT *arg_;
+      };
+
+      template <typename ResT>
+      class synch_impl<ResT,void> : public synch_port<ResT>, private boost::noncopyable {
+      public:
+        typedef void_t argument_type;
+        typedef ResT result_type;
+
+        synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<ResT>(j), max_size_(sz) {} 
+
+        ResT operator()(void) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+        
+          my_joint->log.msg("synch<ResT,void>::operator(void) enter");
+
+          chord_base<ResT> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<resT,void> wait");
+            this->wait_fired(lock, chd);
+          }
+          my_joint->log.msg("synch<ResT,void> fired");
+          boost::function0<ResT> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          return cb();  //invoke callback in my thread
+        }
+
+        ResT put(void) {
+          return operator()();
+        }
+
+        ResT operator()(const boost::xtime& timeout) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<ResT,void>::operator(timeout) enter");
+
+          chord_base<ResT> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<resT,void> timed wait");
+            this->timed_wait_fired(lock, chd, timeout);
+          }
+          my_joint->log.msg("synch<ResT,void> fired");
+          boost::function0<ResT> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          return cb();  //invoke callback in my thread
+        }
+
+        ResT put(const boost::xtime& w) {
+          return operator()(w);
+        }
+
+        void pop_top(void) {
+        }
+
+        void_t top(void) {
+          return void_t();
+        }
+      private:
+        size_t max_size_;
+      };
+
+
+      template <>
+      class synch_impl<void, void> : public synch_port<void>, private boost::noncopyable {
+      public:
+        typedef void_t argument_type;
+        typedef void result_type;
+
+        synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<void>(j), max_size_(sz){} 
+
+        void operator()(void) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<void,void>::operator(void) enter");
+
+          chord_base<void> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<void,void> wait");
+            this->wait_fired(lock, chd);
+          }
+          my_joint->log.msg("synch<void,void> fired");
+          boost::function0<void> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          cb();  //invoke callback in my thread
+        }
+
+        void put(void) {
+          operator()();
+        }
+
+        void operator()(const boost::xtime& timeout) {
+          boost::shared_ptr<joint_base> my_joint(this->joint_); 
+          if(my_joint == 0) 
+            throw not_in_chord_exception();
+          boost::mutex::scoped_lock lock(my_joint->mutex_);
+          if (max_size_> 0 && this->num_pending_ >= max_size_)
+            throw queue_overflow_exception();
+
+          my_joint->log.msg("synch<void,void>::operator(timeout) enter");
+
+          chord_base<void> *chd = 0;
+          if(this->test_chord_fire()) chd = this->get_chord_fired();
+          while (chd == 0) {
+            //no chord ready to fire 
+            //block wait here till awaken by another msg
+            my_joint->log.msg("synch<void,void> timed wait");
+            this->timed_wait_fired(lock, chd, timeout);
+          }
+          my_joint->log.msg("synch<void,void> fired");
+          boost::function0<void> cb;
+          chd->capture_arguments(cb);
+          my_joint->check_heartbeat();
+          lock.unlock();
+          cb();  //invoke callback in my thread
+        }
+
+        void put(const boost::xtime& w) {
+          operator()(w);
+        }
+  
+        void pop_top(void) {
+        }
+
+        void_t top(void) {
+          return void_t();
+        }
+
+      private:
+        size_t max_size_;
+      };
+
+    }
+
+    //------ Pimpl wrappers ------
+
+    template <typename ArgT, typename QueT=std::deque<ArgT> > 
+    class async {
+      template <template <size_t> class, size_t> friend class joint_t;
+    public:
+      typedef detail::async_impl1<ArgT, QueT> ImplT;
+      typedef typename ImplT::argument_type argument_type;
+      typedef typename ImplT::result_type result_type;
+      async(size_t sz=0) : max_size_(sz) {}
+      void put(ArgT t) {
+        if (pimpl_ == 0) throw not_in_chord_exception();
+        pimpl_->put(t);
+      }
+      void operator()(ArgT t) { put(t); }
+      void detach() { 
+        if (!pimpl_)
+          pimpl_->detach();
+      }
+      void reset_joint() {
+        if (pimpl_ != 0)
+          pimpl_->reset_joint();
+      }
+    private:
+      size_t max_size_;
+      boost::shared_ptr<ImplT> pimpl_;
+    };
+
+    template <> 
+    class async<void> {
+      template <template <size_t> class, size_t> friend class joint_t;
+    public:
+      typedef detail::async_impl0 ImplT;
+      typedef ImplT::argument_type argument_type;
+      typedef ImplT::result_type result_type;
+      async(size_t sz=0) : max_size_(sz) {}  
+      void put(void) {
+        if (!pimpl_) throw not_in_chord_exception();
+        pimpl_->put();
+      }
+      void operator()(void) { put(); }
+      void detach() { 
+        if (pimpl_ != 0)
+          pimpl_->detach();
+      }
+      void reset_joint() {
+        if (pimpl_ != 0)
+          pimpl_->reset_joint();
+      }
+    private:
+      size_t max_size_;
+      boost::shared_ptr<ImplT> pimpl_;
+    };
+
+    template <typename ResT, typename ArgT>
+    class synch {
+      template <template <size_t> class, size_t> friend class joint_t;
+    public:
+      typedef detail::synch_impl<ResT, ArgT> ImplT;
+      typedef typename ImplT::argument_type argument_type;
+      typedef typename ImplT::result_type result_type;
+      synch(size_t sz=0) : max_size_(sz) {}
+      ResT put(ArgT t) {
+        if (pimpl_ == 0) throw not_in_chord_exception();
+        return pimpl_->put(t);
+      }
+      ResT operator()(ArgT t) { return put(t); }
+      void detach() { 
+        if (pimpl_ != 0)
+          pimpl_->detach();
+      }
+      void reset_joint() {
+        if (pimpl_ != 0)
+          pimpl_->reset_joint();
+      }
+    private:
+      size_t max_size_;
+      boost::shared_ptr<ImplT> pimpl_;
+    };
+
+    template <typename ResT>
+    class synch<ResT,void> {
+      template <template <size_t> class, size_t> friend class joint_t;
+    public:
+      typedef detail::synch_impl<ResT, void> ImplT;
+      typedef typename ImplT::argument_type argument_type;
+      typedef typename ImplT::result_type result_type;
+      synch(size_t sz=0) : max_size_(sz) {}
+      ResT put(void) {
+        if (pimpl_ == 0) throw not_in_chord_exception();
+        return pimpl_->put();
+      }
+      ResT operator()(void) { return put(); }
+      void detach() { 
+        if (pimpl_ != 0)
+          pimpl_->detach();
+      }
+      void reset_joint() {
+        if (pimpl_ != 0)
+          pimpl_->reset_joint();
+      }
+    private:
+      size_t max_size_;
+      boost::shared_ptr<ImplT> pimpl_;
+    };
+      
+    template <typename ArgT>
+    class synch<void,ArgT> {
+      template <template <size_t> class, size_t> friend class joint_t;
+    public:
+      typedef detail::synch_impl<void, ArgT> ImplT;
+      typedef typename ImplT::argument_type argument_type;
+      typedef typename ImplT::result_type result_type;
+      synch(size_t sz=0) : max_size_(sz) {}
+      void put(ArgT t) {
+        if (pimpl_ == 0) throw not_in_chord_exception();
+        pimpl_->put(t);
+      }
+      void operator()(ArgT t) { put(t); }
+      void detach() { 
+        if (pimpl_ != 0)
+          pimpl_->detach();
+      }
+    private:
+      size_t max_size_;
+      boost::shared_ptr<ImplT> pimpl_;
+    };
+
+    template <>
+    class synch<void,void> {
+      template <template <size_t> class, size_t> friend class joint_t;
+    public:
+      typedef detail::synch_impl<void, void> ImplT;
+      typedef ImplT::argument_type argument_type;
+      typedef ImplT::result_type result_type;
+      synch(size_t sz=0) : max_size_(sz) {}
+      void put() {
+        if (pimpl_ == 0) throw not_in_chord_exception();
+        pimpl_->put();
+      }
+      void operator()() { put(); }
+      void detach() { 
+        if (pimpl_ != 0)
+          pimpl_->detach();
+      }
+      void reset_joint() {
+        if (pimpl_ != 0)
+          pimpl_->reset_joint();
+      }
+    private:
+      size_t max_size_;
+      boost::shared_ptr<ImplT> pimpl_;
+    };
+
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/base/utils.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/utils.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,77 @@
+//
+// boost/join/utils.hpp
+//
+// Copyright (c) 2007 - 20089 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_UTILS_HPP
+#define BOOST_JOIN_UTILS_HPP
+
+#include <iostream>
+#include <string>
+#include <boost/thread.hpp>
+#include <sstream>
+
+namespace boost {
+  namespace join {
+
+    class logger : public std::ostringstream {
+    public:
+      const char *name_;
+      static boost::mutex mutex_; 
+
+      logger(const char *n = 0) : name_(n) {
+      }
+
+      void msg(std::string str) {
+        if (name_ == 0) return;
+        boost::mutex::scoped_lock lock(mutex_);
+        std::cout << "[" << name_ << "] : " << str << std::endl;
+      }
+      std::ostringstream & stream(void) {
+        boost::mutex::scoped_lock lock(mutex_);
+#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
+        return *(new logger(name_));
+#else
+        return *this;
+#endif
+      }
+      void flush(void) {
+        boost::mutex::scoped_lock lock(mutex_);
+        if(name_ != 0)
+          std::cout << "[" << name_ << "] " << this->str();
+        this->str("");
+      }
+      void flushl(void) {
+        boost::mutex::scoped_lock lock(mutex_);
+        if(name_ != 0)
+          std::cout << "[" << name_ << "] " << this->str() << std::endl;
+        this->str("");
+      }
+      typedef void func(logger &l);
+      static void end(logger &l) { 
+        l.flush(); 
+#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
+        delete &l;
+#endif
+      }
+      static void endl(logger &l) { 
+        l.flushl(); 
+#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
+        delete &l;
+#endif
+      }
+    };
+    std::ostream &operator << (std::ostream &os, logger::func *f) {
+      f((logger&)os);
+      return os;
+    }
+    boost::mutex logger::mutex_;
+  }
+}
+
+#endif
+
Added: sandbox/join/boost/join/idioms/asio_executor.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/idioms/asio_executor.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,34 @@
+//
+// boost/join/executor.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_ASIO_EXECUTOR_HPP
+#define BOOST_JOIN_ASIO_EXECUTOR_HPP
+
+#include <boost/asio.hpp>
+#include <boost/join/join.hpp>
+
+namespace boost {
+  namespace join {
+
+    //integration with Boost.Asio
+    //submit async tasks to asio's completion_event queue to be executed by main thread
+    class asio_executor {
+    public:
+      boost::asio::io_service& io_service_;
+      asio_executor(boost::asio::io_service& io_service): io_service_(io_service) {}
+      template <typename task_type>
+      void operator()(task_type task) {
+        io_service_.post(task); 
+      }
+    };
+
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/idioms/executor.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/idioms/executor.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,108 @@
+//
+// boost/join/executor.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_SIMPLE_EXECUTOR_HPP
+#define BOOST_JOIN_SIMPLE_EXECUTOR_HPP
+
+#include <iostream>
+#include <map>
+#include <boost/thread.hpp>
+#include <boost/join/join.hpp>
+
+namespace boost {
+  namespace join {
+
+    template <template <size_t> class scheduler=sched_first_match, size_t sz=32>
+    class executor_t : public joint_t<scheduler, sz> {
+    public:
+      typedef scheduler<sz> sched_type;
+      typedef joint_t<scheduler, sz> joint_type;
+      typedef typename joint_type::callable task;
+
+      //api: one default task queue
+      async<task> execute;
+      synch<void,void> shutdown;
+
+      void spawn(task t) { execute(t); }
+
+      executor_t(int num, const char *name = 0) : joint_type(0, 0, name) {
+        chord(run, execute, &executor_t::exec_cb);
+        chord(run, stopped, &executor_t::stop_cb);
+        chord(shutdown, started, &executor_t::shutdown_cb);
+        chord(shutdown, stopped, &executor_t::stopped_cb);
+        num_threads_ = num;
+        for(int i=0; i<num; i++)
+          threads_.create_thread(boost::bind(&executor_t::main_loop, this));
+        started(); //init state
+      }
+      ~executor_t() {
+        shutdown();
+      }
+
+    private:
+      synch<bool,void> run;
+      //executor states
+      async<void> started;
+      async<void> stopped;
+      //thread pool
+      boost::thread_group threads_;
+      int num_threads_;
+
+      //entry func of each thread - a loop which exists when no tasks in execute queue and shutdown
+      void main_loop(void) {
+        this->log->msg("a thread starts...");
+        while(run()) {}
+        this->log->msg("a thread exits...");
+      }
+      bool exec_cb(void_t run, task tasklet) {
+        this->log->msg("start one task...");
+        try {
+          tasklet();
+        }
+        catch (join_exception &je) {
+          this->log->msg(je.what());
+        }
+        catch (...) {
+          this->log->msg("UNKNOWN exceptions happen inside a executor thread, ignore.");
+        }
+        this->log->msg("finish one task...");
+        return true; //worker thread continue
+      }
+      bool stop_cb(void_t run, void_t stopd) {
+        return false; //worker thread exit
+      }
+      void shutdown_cb(void_t shdn, void_t started) {
+        this->log->msg("shutdown...");
+        for(int i=0; i<num_threads_; i++) //stop all threads in pool
+          stopped();
+        //waiting for the threads to exit
+        this->log->msg("wait...");
+        threads_.join_all();    
+        this->log->msg("all threads exit, done...");
+        stopped(); //add one more stopped(), in case shutdown() is called more than once
+      }
+      void stopped_cb(void_t shdn, void_t stopd) {
+        this->log->msg("stopped...");
+        stopped();
+      }
+    };
+
+    //define a default executor_t 
+    class executor : public executor_t<> {
+    public:
+      executor(int num, const char *name = 0) : 
+        executor_t<>(num, name)
+      {
+      }
+    };
+
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/idioms/rr_executor.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/idioms/rr_executor.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,128 @@
+//
+// boost/join/rr_executor.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_RR_EXECUTOR_HPP
+#define BOOST_JOIN_RR_EXECUTOR_HPP
+
+#include <iostream>
+#include <map>
+#include <boost/thread.hpp>
+#include <boost/join/join.hpp>
+
+namespace boost {
+  namespace join {
+
+    template <template <size_t> class scheduler=sched_pri_round_robin, size_t sz=32>
+    class rr_executor_t : public joint_t<scheduler, sz> {
+    public:
+      typedef scheduler<sz> sched_type;
+      typedef joint_t<scheduler, sz> joint_type;
+      typedef typename joint_type::callable task;
+      typedef std::map<size_t, async<task> >  que_map_type;
+
+      //static api: one default task queue
+      async<task> execute;
+      synch<void,void> shutdown;
+
+      void spawn(task t) { execute(t); }
+
+      //dynamic api: dynamically added task queues
+      async<task> task_queue(size_t i=0) {
+        //since rr_executor itself uses async/synch_p<> methods, we can have task
+        //queues [1-(sz-5)]
+        if (i>(sz-5)) i = i%(sz-5); 
+        if(i==0) return execute;
+        if(this->my_scheduler() != schedule_round_robin) {
+          return execute;
+        }
+        typename que_map_type::iterator iter;
+        if ((iter=que_map_.find(i)) != que_map_.end())
+          return iter->second;
+        else {
+          this->log->msg(" creating task_queue ......");
+          async<task> nq;
+          chord(run, nq, &rr_executor_t::exec_cb);
+          que_map_[i] = nq;
+          return nq;        
+        }
+      }
+
+      rr_executor_t(int num, const char *name = 0) : 
+        joint_type(0, 0, name) {
+        chord(run, execute, &rr_executor_t::exec_cb);
+        chord(run, stopped, &rr_executor_t::stop_cb, 1);
+        chord(shutdown, started, &rr_executor_t::shutdown_cb, 1);
+        chord(shutdown, stopped, &rr_executor_t::stopped_cb, 1);
+        for(int i=0; i<num; i++)
+          threads_.create_thread(boost::bind(&rr_executor_t::main_loop, this));
+        started(); //init state
+      }
+      ~rr_executor_t() {
+        shutdown();
+      }
+
+    private:
+      synch<bool,void> run;
+      //rr_executor states
+      async<void> started;
+      async<void> stopped;
+      boost::thread_group threads_;
+      //dynamic api: dynamically added task queues
+      que_map_type que_map_;
+
+      void main_loop(void) {
+        this->log->msg("a thread starts...");
+        while(run()) {}
+        this->log->msg("a thread exits...");
+      }
+      bool exec_cb(void_t run, task tasklet) {
+        this->log->msg("start one task...");
+        try {
+          tasklet();
+        }
+        catch (join_exception &je) {
+          this->log->msg(je.what());
+        }
+        catch (...) {
+          this->log->msg("UNKNOWN exceptions happen inside a rr_executor thread, ignore.");
+        }
+        this->log->msg("finish one task...");
+        return true; //worker thread continue
+      }
+      bool stop_cb(void_t run, void_t stopd) {
+        stopped(); 
+        return false; //worker thread exit
+      }
+      void shutdown_cb(void_t shdn, void_t started) {
+        this->log->msg("shutdown...");
+        stopped();
+        //waiting for the threads to exit
+        this->log->msg("wait...");
+        threads_.join_all();    
+        this->log->msg("all threads exit, done...");
+      }
+      void stopped_cb(void_t shdn, void_t stopd) {
+        this->log->msg("stopped...");
+        stopped();
+      }
+    };
+
+    //define a default rr_executor_t taking 32 async / synch methods or at most 27 task queues
+    class rr_executor : public rr_executor_t<> {
+    public:
+      rr_executor(int num, const char *name = 0) : 
+        rr_executor_t<>(num, name)
+      {
+      }
+    };
+
+  }
+}
+
+#endif
Added: sandbox/join/boost/join/join.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/join.hpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,19 @@
+//
+// boost/join/join.hpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_HPP
+#define BOOST_JOIN_HPP
+
+#include <boost/join/base/port.hpp>
+#include <boost/join/base/joint.hpp>
+#include <boost/join/idioms/executor.hpp>
+#include <boost/join/idioms/rr_executor.hpp>
+
+#endif
+
Added: sandbox/join/libs/join/doc/boost_join_design.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/boost_join_design.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,301 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+  <title>Join Design</title>
+  <meta content="Yigong Liu" name="author">
+  <meta content="Join Design Doc" name="description">
+</head>
+<body>
+<h2 style="text-align: center;">Join - Asynchronous Message
+Coordination and
+Concurrency
+Library</h2>
+<div style="text-align: center;">
+<h2>Yigong Liu (2007-2009)</h2>
+<h4>yigongliu_at_[hidden]</h4>
+<p style="color: rgb(255, 0, 0);">Warning:  This library is NOT an
+official Boost library yet </p>
+<hr style="width: 100%; height: 2px;">
+<ol style="text-align: left;" id="mozToc">
+<!--mozToc h3 1-->
+  <li>Introduction</li>
+  <li><a href="chords_joints.html">Chords and
+Joints</a></li>
+  <li>Samples</li>
+  <li>Synopsis of Join Classes</li>
+  <li>Join Internals</li>
+  <li>References</li>
+</ol>
+<hr style="width: 100%; height: 2px;"></div>
+<h3><a class="mozTocH3" name="mozTocId387457"></a>Introduction</h3>
+Join is an asynchronous message coordination and concurrency library
+based
+on concepts and techniques developed in JoCaml[1] and Cω[2]. It is
+applicable both to
+multithreaded applications
+and
+to the orchestration of asynchronous message flows.<br>
+<br>
+Join supports message orchestration with a few simple
+abstractions:<br>
+<ol>
+  <li>
+    <h4> typed asynchronous and synchronous message
+ports<br>
+    </h4>
+  </li>
+</ol>
+<div style="text-align: left;">
+<div style="margin-left: 40px;">These are function objects used as
+typed ports or interfaces to message flows. Message passing is done by
+invoking or
+calling these function objects. Ports by themselves
+are not
+complete message passing constructs yet, they are merely interfaces to
+these constructs which are created when ports are "joined"
+together thru chords. For example, a message queue has a sending port
+and receiving port.<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>   <span style="font-weight: bold;">async<MsgT></span></li>
+</ul>
+<div style="margin-left: 80px;">A asynchronous port provides
+one-way,
+non-blocking calls. Message passing thru async<T> port is
+guaranteed to return immediately; internally there
+could be a queue to buffer the arguments or message.<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>   <span style="font-weight: bold;">synch<ResT,
+MsgT></span></li>
+</ul>
+<div style="margin-left: 80px;">A synchronous port is similar to
+normal
+method call, in that the calling thread will block till result is
+returned. However a normal method call just
+involve the calling thread, a call to synch<R,T> port could
+involves
+multiple threads and synchronization.<br>
+</div>
+</div>
+<ol start="2">
+  <li>
+    <h4>chords (or join patterns)<br>
+    </h4>
+  </li>
+</ol>
+<div style="margin-left: 40px;">A chord binds a set of ports to the
+message processing function which will be invoked when all ports in the
+set are called and messages are available.<br>
+</div>
+   <br>
+<div style="margin-left: 40px;">In Cω, a thread-safe buffer can be
+defined as following:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">public
+class Buffer {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  
+   public async Put(string s);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  
+   <span style="font-style: italic;">public string Get()
+& Put(string s) { return s; }   //a chord</span></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">} </span><br>
+</div>
+Here a chord is defined with a synchronous Get port and asynchronous
+Put port;
+the calling of Get will block if no Put is called yet, otherwise the
+string sent by Put will be returned to Get.   <br>
+<br>
+In Join, the buffer class can be
+defined in Cω style as
+following:<br>
+<div style="margin-left: 40px;">class buffer : public <span
+ style="font-weight: bold;">joint</span> {<br>
+public:<br>
+<span style="font-weight: bold;">    async<string>
+put;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">   
+synch<string,void> get;</span><br>
+    buffer() {<br>
+<span style="font-weight: bold;">       
+<span style="font-style: italic;">chord(get, put, &buffer::proc);</span></span><br>
+    }<br>
+<span style="font-weight: bold;">    string proc(void_t
+g, string p) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">   
+     return p;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    }</span><br>
+<span style="font-weight: bold;"></span>};<br>
+<br>
+or with boost::lambda, it becomes:<br>
+class buffer {<br>
+public:<br>
+<span style="font-weight: bold;">    async<string>
+put;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    synch<string,
+void> get;</span><br style="font-weight: bold;">
+    buffer() {<br>
+<span style="font-weight: bold;">       <span
+ style="font-style: italic;">joins().chord(get, put, lambda::_2);</span></span><br
+ style="font-weight: bold;">
+    }<br>
+}<br>
+</div>
+</div>
+<div style="margin-left: 40px;">   <br>
+</div>
+<div style="margin-left: 40px;">In JoCaml, such a buffer can be created
+with a "factory" function:<br>
+<span style="font-weight: bold;">       let
+create_buffer() = </span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      
+      def put(n) & get() = reply n to get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+      put, get</span><br>
+<br>
+Similarly in Join, with the help of boost lambda and tuples, a factory
+function can be defined in JoCaml style as following:<br>
+<span style="font-weight: bold;">      
+template <typename T></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      
+tuple<async<T>, synch<T,void> > create_buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+      async<T> put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+      synch<T,void> get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+      <span style="font-style: italic;">joins().chord(get,
+put, lambda::_2);</span></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      
+      return make_tuple(put, get);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">       }</span><br>
+             <br>
+such a buffer can be safely used in
+multithread applications:<br>
+<div style="margin-left: 40px;">buffer
+b;<br>
+b.put("Hello");
+b.put("World");<br>
+cout <<
+b.get() << b.get() << endl;<br>
+</div>
+</div>
+<br>
+<div style="margin-left: 40px;">Difference of Join with Cω in
+Chord definitions:<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>Cω's chord defintion is "static", declared in class definition,
+cannot change during runtime, can be optimized by compiler.</li>
+  <li>Join's chord definition is "dynamic", created by chord()
+functions, can be changed during runtime. It can support dynamic join
+programming similar
+to JoCaml and CCR[4].</li>
+</ul>
+<ol start="3">
+  <li>
+    <h4>joint<br>
+    </h4>
+  </li>
+</ol>
+<div style="margin-left: 40px;">A joint "joins" together a set of
+chords, each of which may bind several ports and may share some ports
+with each other, thus creating competing requests for
+messages, which are synchronized by joint's internal logic. A joint
+defines a complete synchronized concurrent message passing contruct. A
+joint is
+where message ports meet
+and processing logic are attached thru chords. All application code
+which use
+async/synch ports and chords to define concurrent behaviours should
+either inherit class joint or use a joint instance to join
+message flows together. Joint maintains the internal state
+about the
+message
+arrivals and synchronization, and firing chords when all of their
+messages are available.<br>
+<br>
+</div>
+In Join based applications, with the help of simple thread-pool based
+executors (which can be built on top of Join's primitives), we can
+develop concurrent
+applications without
+explicit usage of threads (thread creation and synchronization);
+concurrency are mostly defined and controlled by chords with only
+async<> ports.<br>
+<div style="margin-left: 40px;"></div>
+<h3><span class="mozTocH3"></span>License</h3>
+Join is licensed under the <a
+ href="http://www.boost.org/LICENSE_1_0.txt">Boost Software License</a>.<br>
+<h3><span class="mozTocH3"></span>Installation<br>
+</h3>
+Join is continuously being developed and tested in Linux (Fedora
+Core & Ubuntu) and Windows (WindowsXP/Vista and Visual C++ 2008).
+The
+implementation is solely based on standard boost facilities
+(Boost.Bind, Boost.Function, Boost.Shared_Ptr, etc.) which will be part
+of next C++ standard. <br>
+Download: http://port.sourceforge.net<br>
+Installation: <br>
+Join is a header only library. There is no need to build the
+library itself to use it. Please following these steps:<br>
+<div style="margin-left: 40px;"><a
+ href="http://www.boost.org/more/getting_started.html">download or
+checkout boost distribution</a><br>
+download latest boost_join_x_x.tar.gz<br>
+tar xvzf boost_join_x_x.tar.gz<br>
+add boost's directory and Join's directory to compilers' include
+path<br>
+cd to <boost_join_directory>/libs/join/exmaple<br>
+bjam<br>
+</div>
+<h3>Acknowlegement<br>
+</h3>
+The original theory and systems of Join Calculus / JoCaml are developed
+by Cédric Fournet, Georges
+Gonthier, Jean-Jacques Lévy, Luc Maranget and Didier Rémy
+and others at MOSCOVA INRIA.<br>
+At Microsoft Research, Cω is developed by Nick Benton, Luca Cardelli,
+Cédric Fournet and others. Claudio Russo developed the C# Joins
+library which is an efficient combinator library for Cω-style join
+patterns, implemented in C# 2.0 generics and accessible from multiple
+.Net languages.<br>
+An earlier version of the Join library described here is based on
+both Cω and Jocaml
+which supports multiple synchronous methods per chord (join-pattern)
+and propagation of exceptions to all synchronous callers. <br>
+The current version
+is mostly based on Cω, supporting only one synchronous method per chord
+for simplicity and better performance.<br>
+Thanks Dr. Claudio Russo at Microsoft Research for many insightful
+email
+discussions, clarifying many of my questions about Cω internals and
+semantics, and many helpful links to other Join based projects.<br>
+Thanks Dr. Luc Maranget at Moscova INRIA for many kind advices and
+links to papers, for pointing out the advantages (simplification and
+optimization) of the single-synch per chord design.<br>
+In the earlier version of this Join library, the functional signature
+API of async / synch methods is modeled after
+Douglas Gregor's Boost.Signals.<br>
+For earlier versions which support multiple
+synchronous methods and exception propagation,  the scheme of
+catching
+exceptions in one thread, and rethrowing it in another thread is
+based on Christopher M. Kohlhoff's idea and sample code posted in boost
+email
+list discussions.<br>
+<br>
+<br>
+<div style="margin-left: 40px;"></div>
+<span style="font-weight: bold;">
+</span>
+</body>
+</html>
Added: sandbox/join/libs/join/doc/chords_joints.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/chords_joints.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,256 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+  <title>Chords and Joints</title>
+</head>
+<body>
+<h2>Chords and Joints<br>
+</h2>
+<h3>The Role of Chords, and Joints</h3>
+In sequential languages, a common construct of control flow is "switch
+/ case" statements:<br>
+<div style="margin-left: 40px;">switch(val) {<br>
+case val1: statements1; break;<br>
+case val2: statements2; break;<br>
+...<br>
+default: ...<br>
+}<br>
+</div>
+<br>
+In CSP based concurrent languages, such as Occam, Limbo, etc., there is
+a "ATL" construct to branch control flow based on the communication
+readiness of channels:<br>
+<div style="margin-left: 40px;">alt {<br>
+    i = <- inchan =><br>
+       statements1;<br>
+    outchan <- = "sent it!" =><br>
+       statements2;<br>
+}<br>
+</div>
+Different blocks of statements will run based on the communication
+readiness of channels (inchan, outchan). However in "alt", if the
+qualifier before "=>" contains several channel communication
+operations (<-, ->), only the first one will be tested. The
+relationship between different "alting" segments are OR relationship.<br>
+<br>
+In JoCaml, an "count-down" idiom is defined as :<br>
+<div style="margin-left: 40px;">def count(n) &tick() = count(n-1)<br>
+or count(0) & wait() = reply to wait<br>
+</div>
+There are 2 join-patterns (chords) here: count(n) & tick(),
+count(0) & wait(), each of which contains 2 ports. Similar to CSP,
+join-patterns act as qualifier "guarding" the actions statements
+following it. If all two ports in a join-pattern are called, the action
+statements of this chord is ready to run. The relationship inside a
+join-pattern or chord is AND, while the relationship between different
+chords are OR.<br>
+<br>
+Similarly in Join, joints act as a "switch / case" construct for
+message passing, run different codes depending on the communication
+readiness of ports. So it is a construct for message coordination and
+orchestration, such as following code, different operations are done
+based on readiness of ports:<br>
+async<T1> port1;<br>
+async<T2> port2;<br>
+async<T3> port3;<br>
+joins(executor)<br>
+    .chord(port1, port2, operation1)<br>
+    .chord(port2, port3, operation2);<br>
+<h3>Chord Definition</h3>
+Chords are created when joint's chord() methods are invoked. A chord
+binds a set of ports to the processing logic which consume and
+process messages from those ports. The processing logic is a
+function object which can be in the form of normal function, lambda,
+class method, or object method. Its signature can be deduced as
+following:<br>
+<div style="margin-left: 40px;">for a chord of a set of ports with
+types: <br>
+    PortT1, PortT2, PortT3, ...<br>
+the chord binds to a function object "call" with signature:<br>
+    <span style="font-weight: bold;">PortT1::result_type
+call(PortT1::argument_type,
+PortT2::argument_type, PortT3::argument_type, ...);</span><br>
+</div>
+For async<MsgT> port, its result_type is "void", and its
+argument_type is the message type it carries - MsgT.<br>
+For synch<ResT,MsgT>, obviously its result_type is ResT and its
+argument_type is MsgT.<br>
+For ports sending "void" messages (async<void>, synch<ResT,
+void>), their argument type is "void_t" defined as <span
+ style="font-weight: bold;">struct void_t {}</span>; so C/C++ compilers
+can be satisfied.<br>
+<br>
+In the set of ports of a chord, there can be at most one
+synch<R,T> port; and if there is one, it must be the first
+argument of chord and so its result_type is the return type of chord's
+"call" function object.<br>
+<br>
+So a thread safe message queue with async send port and synchronous
+"blocking" recv port can be defined as following:<br>
+    template <MsgT><br>
+    class msg_que : public joint {<br>
+    public:<br>
+       async<MsgT> send;<br>
+       synch<MsgT,void> recv;<br>
+       msg_que() {<br>
+          chord(recv, send,
+&msg_que::proc);<br>
+       }<br>
+    private:<br>
+       MsgT proc(void_t r, MsgT s) { return s;
+}<br>
+    }<br>
+Please note that synchronous port "recv" is used as the first
+argument of chord; and the signature of  msg_que::proc matches the
+argument types of chord's ports.<br>
+<br>
+Using boost::lambda, the msg_que class can be redefined as following:<br>
+    template <MsgT><br>
+    class msg_que {<br>
+    public:<br>
+       async<MsgT> send;<br>
+       synch<MsgT,void> recv;<br>
+       msg_que() {<br>
+          joins().chord(recv, send,
+_2);<br>
+       }<br>
+    }<br>
+<h3>Joint Definition</h3>
+A joint "joins" a set of chords which may share ports, thus
+competing for messages. Internally joint synchronizes the consumption
+of messages and schedules the firing of chords.<br>
+A joint can be defined and used in 2 ways:<br>
+<ol>
+  <li>The joint initially defines a set of chords which later may need
+to be changed (by chord_remove, chord_override, reset) to change
+message processing logic. In this case we need a name or reference to
+the "joint" object to invoke those methods. For this, the "joint" class
+can be used as parent of application classes, or a "joint" object can
+be instantiated; for example:</li>
+</ol>
+<div style="margin-left: 40px;">async<T1> chan1;<br>
+async<T2> chan2;<br>
+async<T3> chan3;<br>
+joint joins1(executor);<br>
+joins1<br>
+    .chord(chan1, chan2, proc1)<br>
+    .chord(chan2, chan3, proc2);<br>
+...<br>
+joins1.override_chord(chan1, chan2, new_proc);<br>
+joins1.remove_chord(...);<br>
+</div>
+<ol start="2">
+  <li>In many cases, joint is used one-shot to create a "fixed" set of
+synchronization chords which never change during its lifetime. And
+async<> / synch<> ports are the primary programming
+interfaces. We can
+use "factory" funtions "joins() / joins_t()" to create unnamed joints
+for this purpose:</li>
+</ol>
+<div style="margin-left: 40px;">async<T1> chan1;<br>
+async<T2> chan2;<br>
+async<T3> chan3;<br>
+joins(executor)<br>
+    .chord(chan1, chan2, proc1)<br>
+    .chord(chan2, chan3, proc2);<br>
+<br>
+</div>
+The variance of joint can be configured from the following aspects:<br>
+<ul>
+  <li>template parameters: <br>
+  </li>
+  <ul>
+    <li>scheduling policies: <br>
+    </li>
+  </ul>
+</ul>
+<div style="margin-left: 80px;">simple schedulers:<br>
+    sched_first_match, sched_longest_match,
+sched_round_robin;<br>
+priority based schedulers:<br>
+    sched_pri_first_match, sched_pri_longest_match,
+sched_pri_round_robin;</div>
+<ul>
+  <ul>
+    <li>max number of ports: 32 or larger<br>
+    </li>
+  </ul>
+</ul>
+<div style="margin-left: 40px;">The templated joint type can be defined
+similar to:  joint_t<sched_round_robin, 32>. The default
+"joint" type is "joint_t<sched_first_match,32>".<br>
+or unnamed joints can be created with template factory function: 
+joins_t<sched_round_robin, 32>(executor,...). The default factory
+method "joins()" is "joins_t<sched_first_match,32>()".<br>
+</div>
+<ul>
+  <li>instantiation parameters: <br>
+  </li>
+  <ul>
+    <li>executor: if a joint contains chords with all async ports
+(such as the above 2 joint definitions), chord's function have to run
+in a separate thread; executors with type
+boost::function<void(callable)> are used to spawn new tasks.<br>
+    </li>
+    <li>heartbeat: an experiment feature to allow auto-destruction of
+joint and its chords. If a positive heartbeat is specified, after
+"heartbeat" number of firings of chords, this joint will be
+auto-destroyed and ports detached.<br>
+    </li>
+    <li>name: if defined, debuging messages will be printed for this
+joint.<br>
+    </li>
+  </ul>
+</ul>
+<div style="margin-left: 40px;"></div>
+<h3>Runtime Semantics</h3>
+Quoted and modified from <a
+ href="http://research.microsoft.com/Comega/doc/comega_tutorials_concurrency_extensions.htm">Cω
+Concurrency Extensions Tutorials [2]</a>:
+<p style="margin-left: 40px;">The body (message processing logic) of a
+chord can only execute once
+<i>all</i> the ports in
+its header have been called. When a async / synch port
+is called there may be zero, one, or more chords which are enabled
+(ready to fire):</p>
+<ul style="margin-left: 40px;">
+  <li> If no chord is enabled then the port invocation is queued up.
+If the port is asynchronous, then this simply involves adding the
+message to a queue. If the method is
+synchronous, then the calling thread is blocked. </li>
+  <li> If there is a single enabled chord, then the messages of the
+calls involved in the match are de-queued, any blocked thread involved
+in the match is awakened, and the body runs. </li>
+  <li> If there are several chords which are enabled then the joint
+scheduling policy decides which is chosen to run. </li>
+  <li>chord body execution:<br>
+  </li>
+  <ul>
+    <li> When a chord which involves only asynchronous ports runs,
+then it does so in a thread of executors' thread pool.</li>
+  </ul>
+  <ul>
+    <li>If a chord involves a (single) synch<> method, the chord
+body will execute in the thread which calls the synch<> method</li>
+  </ul>
+  <li><span style="font-weight: bold;">Exception handling</span>: if an
+exception is thrown inside a chord
+"body" function object,</li>
+  <ul>
+    <li>if this chord only involves asynchronous methods, its body will
+run in a thread from executor's thread pool, and the exception thrown
+inside chord body will be caught and then simply dropped (in fact if
+executor's log is
+turned on, we'll see it in log), since the semantics of async calls are
+to return immediately with no result.<br>
+    </li>
+    <li>if this chord has a synchronous method, the chord body will
+execute in the calling thread of this synchronous method and this
+caller will also get the exception.<br>
+    </li>
+  </ul>
+</ul>
+</body>
+</html>
Added: sandbox/join/libs/join/doc/internals.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/internals.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,320 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <title>Join Internals</title>
+</head>
+<body>
+<h2 style="text-align: left;">Join Internals</h2>
+<h3>Architecture</h3>
+Join's internal design consists of 2 parts:<br>
+<ol>
+  <li>Low Level Resource Acquisition Core <br>
+  </li>
+</ol>
+<div style="margin-left: 40px;">The core of Join (and similarly
+JoCaml/Cw/C#.Joins) is a simple resource acquisiton (and contention
+management)
+manager which supports atomic acquisition of multiple resources:<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>use a bitmap to represent
+(global) resource-availability status:</li>
+</ul>
+<div style="margin-left: 40px;">
+<ul>
+</ul>
+</div>
+<div style="margin-left: 80px;">
+<ul>
+  <li>each bit is used for
+status of one resource (available/unavailable) or message
+(arrival/missing)</li>
+  <li>bitmap -
+represent the status of multiple resources</li>
+  <li>each acquisition
+(behaviour/pattern) is also represented as a bitmap (chord)</li>
+</ul>
+<ul>
+</ul>
+</div>
+<ul style="margin-left: 40px;">
+  <li>one
+acquisition can try to acquire multiple resources (chord bitmap
+contains
+multiple "1" bits)</li>
+  <li>applications can use
+multiple concurrent / competing acquisition behaviours(patterns) whose
+bitmaps overlap or which
+compete for the same global resources</li>
+  <li>fast
+dispatching / scheduling:</li>
+</ul>
+<ul style="margin-left: 40px;">
+</ul>
+<div style="margin-left: 80px;">whenever a new resource becomes
+available, the global status
+bitmap is updated, and the new global status
+bitmap is compared with chords' bitmaps; if a chord's bitmap
+matches (covered by) the global
+status bitmap, this acquisition can be satisfied (or chord is fired):
+one item is removed from each resource marked by the bits of chord's
+bitmap, and the global status bitmap is updated to reflect new
+status of resource availability.<br>
+If more than one chord can be fired, we can apply
+various scheduling algorithms: round robin, priority based<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li><span style="font-weight: bold;">a single mutex</span> is used to
+protect all
+these bitmaps and comparing logic</li>
+  <li>multiple threads can try
+concurrent acquisitions and contention/conflicts are resolved
+atomically and thread-safely  
+ <br>
+  </li>
+</ul>
+<ol start="2">
+  <li>High Level Messaging Semantics</li>
+</ol>
+<div style="margin-left: 40px;">On top of Join's core
+resource-acquisition manager, Join
+(Jocaml/Cw/C#.Joins) adds the following high level messaging based
+semantics :<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>each bit of global resource bitmap is "set" by a port
+(messages are the resources to
+be
+consumed), so each chord/acquisition-pattern is defined
+by a set of ports</li>
+  <li>each chord/pattern is associated with a callback
+(chord-body) which defines how the resources/messages  (acquired
+by
+this chord from ports/ports) are consumed</li>
+</ul>
+<div style="margin-left: 40px;">Two types of port give
+different semantics :<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>asynchronous: when an asynchronous port is invoked, if no
+acqusition bitmaps is satisfied (no chord can fire), arguments/messages
+are buffered,
+global
+resource bitmap is updated and calling thread returns without blocking</li>
+  <li>synchronous: when a synchronous port is invoked, if no
+acqusition bitmap
+is satisfied (no chord can fire), arguments/messages are kept at stack,
+global
+resource bitmap is updated and calling thread is blocked (<span
+ style="font-weight: bold;">a condition variable </span>is
+used)</li>
+</ul>
+<div style="margin-left: 40px;">If there exist a chord (acquisition
+pattern) which
+can fire, depending on the types of acquisition pattern (chord), there
+are 2 behaviours:<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>if the chord contains a synchronous port, the blocked thread
+of synch-call is waked up and the
+chord-body
+callback is executed by/inside that thread</li>
+  <li>if the chord contains all async ports/ports, the chord-body
+callback is executed in
+"another/different"
+thread (which can be
+a new thread or from thread-pool), this is how "spawning" is done in
+Join</li>
+</ul>
+<h3>Some facts result from the architecture<br>
+</h3>
+<ol start="1">
+  <li>Join's usage of low level synchronization primitives<br>
+  </li>
+</ol>
+<div style="margin-left: 40px;">The rule to get the number of
+(mutex, condition variable)
+used by a
+Join class is simple: <br>
+<ul>
+  <li>each joint holds a mutex which is used to protect all
+code in async<>/synch<>/chord/joint, </li>
+  <li>each synch<> port holds a
+condition variable and </li>
+  <li>async<> ports hold no synchronization primitive<br>
+  </li>
+</ul>
+Although Join's primitives (async<> / synch<> / chord) are
+high level, the author's experience has shown that concurrent
+applications written with Join usually use almost the same number of
+low level synchronization primitives as manually crafted ones.<br>
+<br>
+</div>
+<div style="margin-left: 40px;">For example, a manually crafted thread
+safe message queue will use a mutex and a condition variable to provide
+an asynchronous send/push interface (send and go) and a synchronous
+receive/pop interface (blocking wait if no message available).<br>
+</div>
+<div style="margin-left: 40px;">In Join, a simple thread safe message
+queue can be defined as following:<br>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename msg_type></span><br>
+<span style="font-weight: bold;">class message_queue : public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+async<msg_type> send;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      
+synch<msg_type,void> recv;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      
+message_queue() {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">       
+    chord(recv,
+send,
+&message_queue::forward);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">       }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    private:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+msg_type
+forward(void_t r, msg_type s) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">       
+    return s;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">       }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};</span><br>
+Based on the above rule, since this message_queue class inherits from
+joint class and has one synch<> port, it will use one mutex
+and
+one condition variable, using exactly the same number of low level
+synchronization primitives as manually crafted.   
+   </div>
+<ol start="2">
+  <li>Join's expressiveness<br>
+  </li>
+  <ol>
+  </ol>
+</ol>
+<div style="margin-left: 40px;">Join provides a generic framework to
+compose asynchronous and synchronous behaviours in thread safe manner.
+Join's special internal design makes it
+very expressive in creating concurrent applications:<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>Join is expressive in that it can be used to
+create most common concurrency / synchronization constructs with
+ease,  such as monitor, read-write lock, or joints. See tutorials
+for detailed coding</li>
+  <ul>
+  </ul>
+  <li>Join is expressive in that it is common and easy to
+write
+concurrent (thread-safe) applications using solely Join's primitives
+(async/synch/chord) without using low level threads, mutex and
+condition variables. Again see tutorials for more samples. Please refer
+to this <a href="../../../boost/join/idioms/executor.hpp">Join based
+simple
+executor</a> implmented sole thru Join's primitives.<br>
+  </li>
+</ul>
+<ol start="2">
+  <ol>
+  </ol>
+</ol>
+<ol start="3">
+  <li>Since Join's core
+directly supports atomic acquisition of multiple resources, it helps
+nicely multithreaded applications which involve multiple resources:</li>
+</ol>
+<ul style="margin-left: 40px;">
+  <li>Multithreaded applications whose threads acquiring conflicting
+sets of multiple resources (or locks)  are prone to
+dead-locks.  The normal way to avoid dead-lock is by enforcing a
+consistent (global) order of acquiring resources (locks), as clearly
+explained in Herb Sutter's article <a
+ href="http://www.ddj.com/hpc-high-performance-computing/204801163">"Use
+Lock Hierarchies to Avoid Deadlock"</a>.</li>
+</ul>
+<div style="margin-left: 80px;">Join provides a natural solution to
+this issue. We can use async ports/ports to represent the
+availability status of resources. The existence of messages at these
+async ports represent that resources (locks) are available to be
+consumed. Each acquisition pattern is represented as a chord which has
+one synch port and multiple async ports. For example, to
+resolve the conflicting acquisitions of resources from multiple
+threads, we can define the following resource manager:<br>
+<span style="font-weight: bold;">class resource_manager : joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    async<void>
+resource1;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    async<void>
+resource2;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    async<void>
+resource3;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    async<void>
+resource4;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    ......</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">   
+synch<void,void>
+acquire1;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">   
+synch<void,void>
+acquire2;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">   
+synch<void,void>
+acquire3;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    ......</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    resource_manager() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+chord(acquire1, resource1, resource3, resource4,
+&resource_manager::handle_acquire1);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+chord(acquire2, resource3, resource2, resource1,
+&resource_manager::handle_acquire2);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+chord(acquire3, resource2, resource4,
+&resource_manager::handle_acquire3);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">       ......<br>
+       resource1();<br>
+       resource2();<br>
+       resource3();<br>
+       ......<br style="font-weight: bold;">
+</span><span style="font-weight: bold;">    }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+The above 3 chords define 3 different acquisition patterns competing
+for the same set of resources. Initially a message will be sent on each
+of these async ports to mark resource1,2,3 are available. Different
+threads will call acquire1() / acquire2() / acquire3(), block waiting
+until all of its required resources are available. When a chord is
+fired in one thread, that thread will remove a messages from each of
+async ports marked in the chord pattern, meaning it consumes these
+resources. When this thread is finished with using these resources, it
+will call these async ports to mark the resources available again.
+Please note that the order of these async ports (resources) defined
+in chords are not relevant anymore; since all required async ports
+(resources) are acquired atomically.<br>
+</div>
+<ul style="margin-left: 40px;">
+  <li>In the Futures/Promises concurrency model, one common idiom
+(wait_for_all) is using a future to wait for results from multiple
+asynchronous computations. Join can also be used to implement Futures
+and its wait_for_any and wait_for_all idioms. Please refer to the
+following section on futures/promises
+for more detailed discussions.</li>
+</ul>
+ <br>
+<br>
+</body>
+</html>
Added: sandbox/join/libs/join/doc/references.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/references.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,26 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+  <title>References</title>
+</head>
+<body>
+<h3>References</h3>
+<ol>
+  <li>JoCaml<br>
+  </li>
+  <li><a href="http://research.microsoft.com/Comega/">Comega: "Modern
+Concurrency Abstractions for C#"</a><br>
+  </li>
+  <li><a href="http://research.microsoft.com/%7Ecrusso/joins/index.htm">The
+Joins Concurrency Library</a></li>
+  <li><a
+ href="http://channel9.msdn.com/wiki/default.aspx/Channel9.ConcurrencyRuntime">Coordination
+and Concurrency Runtime (CCR)</a>  <br>
+  </li>
+  <br>
+</ol>
+<br>
+</body>
+</html>
Added: sandbox/join/libs/join/doc/samples.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/samples.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,213 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+  <title>Chords and Joints</title>
+</head>
+<body>
+<h2>Samples</h2>
+Thread safe buffer and message queue
+<br>
+Handle multiple message flows
+<br>
+Using portss as the interface between components<br>
+Joint's life cycle<br>
+<h3><a name="buffer"></a>Thread safe buffer and message queue</h3>
+a thread safe buffer can be defined in various manners.<br>
+<br>
+the most explicit version:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename V></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer: public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<V> put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<V,void> get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    chord(get, put,
+&buffer::chord_body);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  V chord_body(void_t g, V p) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    return p;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer.cpp<br>
+<br>
+simplified version using boost::lambda or boost::phoenix and "joins()"
+factory function:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename V></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<V> put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<V,void> get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    joins().chord(get,
+put, lambda::_2);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer_lambda.cpp<br>
+buffer_phoenix.cpp<br>
+<br>
+Using boost::tuples, boost::phoenix, and "joins()" factory function, we
+can create message queue (essentially a thread safe buffer) using
+factory function in JoCaml
+style:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename V></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">boost::tuple<async<V>,synch<V,void>
+> create_msg_que() {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  async<V> send;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<V,void> recv;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  joins().chord(recv, send, arg2);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  return boost::make_tuple(send,
+recv);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}   <br>
+...<br>
+  async<int> send;<br>
+  synch<int, void> recv;<br>
+  boost::tie(send, recv) = create_msg_que<int>();<br>
+</span></div>
+producer_consumer.cpp<br>
+<h3><a name="flows"></a>Handle multiple message flows</h3>
+A simple demo of joint handling 3 message flows and performing
+different processing on messages when they are available in different
+portss:<br>
+<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">flows_bundle
+make_data_portss(joint::spawn_type e) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<int> flow1;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<int> flow2;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<int> flow3;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  joins(e)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    .chord(flow1,
+flow2, std::cout <<  (arg1 + arg2) << "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    .chord(flow2,
+flow3, std::cout <<  (arg1 - arg2) << "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    .chord(flow1,
+flow3, std::cout <<  (arg1 * arg2) << "\n");</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  return boost::make_tuple(flow1,
+flow2, flow3);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+</div>
+flows.cpp<br>
+<br>
+a sample of joining a vector of message flows into one output flow:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename T></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class join_many : public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  vector<async<T> >
+inputs;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<vector<T>,
+void> output;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  join_many(int num) :
+inputs(num) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    chord(output,
+inputs, &join_many::chord_body);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  vector<T>
+chord_body(void_t out, vector<T> in) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    return in;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+join_many.cpp<br>
+<h3><a name="chain"></a>Using ports as the interface between
+components</h3>
+A sample of a chain of nodes, each node exposes its input and output
+interfaces as ports.<br>
+Each node reads from its input, does some transformation of data, wait
+a second and send it to output interface, all these are performed in
+executor's thread pool.<br>
+The way to connect one node (A) to the next (B) is simple: 
+A.output = B.input.<br>
+<div style="margin-left: 40px; font-weight: bold;">template
+<typename T><br>
+class node {<br>
+  int my_no;<br>
+  boost::function<T(T)> func_;<br>
+public:<br>
+  async<T> input;<br>
+  async<T> output;<br>
+  node(joint::spawn_type e, int n, boost::function<T(T)> f)
+: <br>
+    my_no(n), func_(f), stop(false) {<br>
+    joins(e).chord(input, bind(&node::proc, this,
+_1));<br>
+  }<br>
+private:<br>
+  void proc(T in) {<br>
+        T o = func_(in);<br>
+        thread_sleep(1);<br>
+        output(o);<br>
+  }<br>
+};<br>
+</div>
+chain.cpp<br>
+<br>
+Another sample of using async ports as service interface. The server
+exposes the following interface taking requests in the form of a string
+name and a response port:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">class
+IService {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<tuple<string,
+async<string> > > Service;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};</span><br>
+</div>
+When server receives such a request, it does some work, and send then
+result in response port.<br>
+In other side, clients first create some response ports (Result2.first
+and Result2.second) and pack them inside service requests when sending
+requests to servers; then block waiting on these response ports for
+the results from servers.<br>
+async_call_ret.cpp<br>
+<h3><span style="font-weight: bold;"><a name="lifecycle"></a>Joint's
+life cycle</span></h3>
+After the initial definition, a joint's set of chords can modified thru
+methods: chord_override(), chord_remove() or reset(). And if a joint's
+heartbeat is set during its creation, its set of chords will be
+auto-destroyed after "heartbeat" number of firings of chords.<br>
+Here is a sample demonstrating these joint's lifecycle related methods.<br>
+joint_lifetime.cpp<br>
+<br>
+</body>
+</html>
Added: sandbox/join/libs/join/doc/synopsis_port.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/synopsis_port.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,571 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+  <title>Synopsis of Join Classes</title>
+</head>
+<body>
+<h2>Synopsis of Join Classes<br>
+</h2>
+<ol>
+  <li>classes</li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">namespace
+boost {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    namespace join {</span><br>
+</div>
+<div style="margin-left: 40px;">  <span
+ style="font-weight: bold;">      template
+<typename MsgT, typename QueT> class async;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">       
+template <typename ResT, typename MsgT> class synch;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">       
+template <typename scheduler, size_t max_size>
+class joint;<br>
+        joint joins(joint::spawn_type s =
+0, int hb = 0, const char *name = 0);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">       
+class executor;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+</div>
+<ol start="2">
+  <li>Asynchronous ports<br>
+    <span style="font-weight: bold;"></span></li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename MsgT, typename QueT>  </span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">class async {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      typedef
+MsgT argument_type;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">     
+typedef void result_type;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">     
+......</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;"></span>
+<span style="font-weight: bold;"></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">     
+async(size_t sz=0) : ... { ... }</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      void
+operator()(MsgT msg)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br style="font-weight: bold;">
+<br>
+Asynchronous port objects provide one-way non-blocking message passing
+interfaces.<br>
+<br>
+<span style="font-weight: bold;">typename QueT: <br>
+</span>The queue type async ports used to buffer messages internally
+when no chords are ready to fire. The default QueT is std::deque. QueT
+must satisfy the following interface:<br>
+<div style="margin-left: 40px;">class QueT {<br>
+    void push_back(MsgT);<br>
+    MsgT front();<br>
+    void pop_front();<br>
+    void clear();<br>
+}<br>
+</div>
+<span style="font-weight: bold;"></span><br>
+<span style="font-weight: bold;">async(size_t sz=0):<br>
+Effect: </span>initialize a asynchronous port with underlying
+message queue of <span style="font-weight: bold;">QueT</span> type.
+The queue size <span style="font-weight: bold;">sz</span> is specified
+for flow control; the default value 0 means
+unlimited queue size. Internally there are 2 implementations for this
+class. If the signature is async<void>, there is no
+message to be passed, and template specialization will choose
+a proper implementation without a queue (and its overhead) while
+maintaining the
+same semantics.<span style="font-weight: bold;"><br>
+Throw:<br>
+<br>
+</span><span style="font-weight: bold;">void operator()(MsgT msg): </span><br>
+<span style="font-weight: bold;">Effect:</span> the port api to
+send asynchronous messages; calling threads will return immediately.
+Since async calls will return immediately, and <span
+ style="font-weight: bold;">msg</span> argument may be
+buffered in queues, the <span style="font-weight: bold;">msg</span>
+argument cannot be references or pointers to
+variables on calling stacks which will probably be unwound when async
+calls
+return.<br>
+<span style="font-weight: bold;">Throws</span>: not_in_chord_exception,
+queue_overflow_exception, no_executor_exception<br>
+</div>
+<ol start="3">
+  <li>Synchronous ports<span style="font-weight: bold;"></span></li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename ResT, typename MsgT> <br>
+class synch {<br>
+public:<br>
+      typedef MsgT argument_type;<br>
+      typedef ResT result_type;<br>
+       ......<br>
+ 
+<br>
+      synch(size_t sz=0) : ... { ... }<br>
+</span><span style="font-weight: bold;">     
+ResT operator()(MsgT msg);<br>
+</span><span style="font-weight: bold;">     
+ResT operator()(MsgT msg, boost::xtime &timeout);<br>
+</span><span style="font-weight: bold;">};<br>
+<br>
+</span>Synchronous ports provides blocking message ports.<br>
+<br>
+<span style="font-weight: bold;">synch(size_t sz=0):<br>
+Effect: </span>initialize a synchronous port with underlying thread
+waiting queue.
+The queue size <span style="font-weight: bold;">sz</span> is specified
+for flow control; the default value 0 means
+unlimited queue size. Internally there are four implementations and
+template specialization help choose the most efficient one.<span
+ style="font-weight: bold;"><br>
+Throw:<br>
+<br>
+</span><span style="font-weight: bold;">ResT operator()(MsgT msg):</span><br>
+<span style="font-weight: bold;">Effect: </span>the port
+interface to send a synchronous message;
+calling thread will block here till a reply is available. Since the
+call will block till a result is returned, the <span
+ style="font-weight: bold;">msg</span> argument can be
+anything just as normal function calls, including pointers or
+references to variables on stack.<br>
+<span style="font-weight: bold;">Throws: </span>not_in_chord_exception,
+queue_overflow_exception, or 
+application_specific_exceptions_raised_inside_chord_body<br>
+<br>
+<span style="font-weight: bold;">ResT operator()(MsgT msg, boost::xtime
+&timeout):</span><br>
+<span style="font-weight: bold;">Effect: </span>this is a variant of
+the above interface with a timeout. The call will return either the
+result is available or return with a "synch_time_out_exception" when
+the timeout is reached.<br>
+<span style="font-weight: bold;">Throws:</span><span
+ style="font-weight: bold;"></span> not_in_chord_exception,
+queue_overflow_exception, synch_time_out_exception, 
+application_specific_exceptions_raised_inside_chord_body<span
+ style="font-weight: bold;"> </span>
+</div>
+<ol start="4">
+  <li>Joint<br>
+  </li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<</span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">template <size_t> scheduler =
+sched_first_match,<br>
+</span><span style="font-weight: bold;">   
+            size_t
+max_size = 32></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class joint {<br>
+    typedef function1<void, typename
+joint_base::callable> spawn_type;<br style="font-weight: bold;">
+</span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">    joint(spawn_type spawn =
+0, int heartbeat = 0, const char *name = 0):<br>
+</span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">   
+template
+<typename PortT1, typename PortT2,
+......, typename CallT></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    void chord(PortT1
+&p1, PortT2 &p2, ......, CallT c, int priority=0)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">   
+template
+<typename PortT1, typename PortT2, </span><span
+ style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">typename CallT></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    void
+chord_override(PortT1 &p1, PortT2 &p2, </span><span
+ style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">CallT c</span><span
+ style="font-weight: bold;">, int priority=0</span><span
+ style="font-weight: bold;">)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">   
+template
+<typename PortT1, typename PortT2,
+......></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    void
+chord_remove(PortT1
+&p1, PortT2 &p2, ......)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">   
+......</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">};</span><br>
+Joint can be used as the parent of classes which use
+async<> / synch<> methods and chords to define concurrent
+activities; or it can be used to directly join message ports
+together. Joint maintains the status of message arrival and
+synchronization. <br>
+<br>
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">typename
+spawn_type:</span> <br>
+    the type of execution service used to drive Join
+based asynchronous applications. It could be the async "execute" method
+of the following thread pool based executor, or it could be a adaptor
+to existing applications execution service, as long as it provides a
+functional / functor interface to spawn a task:<br>
+    <span style="font-weight: bold;">void
+operator()(callable t);<br>
+<br>
+</span><span style="font-weight: bold;">max_size:</span> <br>
+    defines the capacity
+of Joint, ie. how many async<> / synch<> ports can be
+defined in this Joint. When max_size <= 32, an integer bitmask is
+used
+to maintain status; while for "larger" Joint, std::bitset<> is
+used for status.<br>
+<br>
+<span style="font-weight: bold;">scheduling_policy: </span><br>
+    Joint
+supports three basic kinds of scheduling policies that decide which
+chord
+will fire when multiple chords become ready at the same time:<br>
+<ul>
+  <li><span style="font-weight: bold;">sched_first_match</span>: fire
+the first chord which is ready to
+fire</li>
+  <li><span style="font-weight: bold;">sched_longest_match</span>: when
+multiple chords are ready to fire,
+fire the one defined with most async<> / synch<> ports.</li>
+  <li><span style="font-weight: bold;">sched_round_robin</span>:
+internally the last chord fired is
+remembered and chords are fired in round-robin style.</li>
+</ul>
+    Additionally Joint supports priority based variants
+of the above three scheduling algorithms.<br>
+<br>
+<span style="font-weight: bold;">joint(spawn_type spawn = 0, int
+heartbeat = 0, const char *name = 0):<br>
+</span>    an Joint is constructed with the following
+two settings:<br>
+<ul>
+  <li><span style="font-weight: bold;">spawn_type spawn</span>: the
+asynchronous port to spawn a new task.
+It could be an adaptor to spawn tasks into existing applications main
+threads (and it must be non-blocking and returns immediately), or the
+execute port of the following executor (thread pool),
+ie.
+the task waiting queues. In Join based applications, concurrency are
+generated by
+chords with only async<> ports whose body will run as a task
+in
+executor thread pool.</li>
+  <li>int <span style="font-weight: bold;">heartbeat: </span>used to
+control the life time of message coordination definitions (chords)
+inside the joint. If a heartbeat ( > 0 ) is provided, the joint's
+internal definitions (chords) will be automatically reset / destroyed
+after its chords have fired "heartbeat" times totally. So after
+heartbeat expires, all of joint's chords will be destroyed and ports
+detached.<br>
+  </li>
+  <li>const char *name: the name of Joint; it is used as a flag to turn
+on/off debug messages, when it is set to a non-NULL value, debugging
+message will be printed out.<br>
+  </li>
+</ul>
+<span style="font-weight: bold;">template
+<typename PortT1, typename PortT2,
+......, typename CallT></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">void chord(PortT1 &p1, PortT2
+&p2, ......, CallT c, int priority=0)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">Effect:
+</span>overloaded chord()
+functions to create chords with different number of
+async<> / synch<> ports. By default Joint has just eight
+overloaded chord() functions to create chords for one to eight
+async<> / synch<> ports,  because any of these
+async<> ports can be a std::vector<async<>>, so in
+fact
+chords can be created for unlimited number of
+async<> ports, while each chord can have at most one
+synchronous port:<br>
+<ul>
+  <li>PortT &p, &p1, ... &pn: async<> / synch<>
+ports which are in the header of chord body; only one of them can be
+synchronous and if we do have one, it must the first port.<br>
+  </li>
+  <li>CallT c: a function, a lambda, class-method, or object-method
+which will consume and process the messages from chord's specified
+ports. Normally the signature of CallT is deduced from chord's port /
+port types:</li>
+</ul>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">PortT1::result_type
+call(PortT1::argument_type, PortT2::argument_type, ...);<br>
+<br>
+</span>Please note that if a port's message type is "void" (such as
+async<void>), we substitue its argument_type as "void_t" which is
+an "empty" struct type defined internally.<span
+ style="font-weight: bold;"><br>
+</span></div>
+<ul>
+  <li>int priority: the scheduling priority of this chord; the
+default value 0 has the highest priority, the greater the number, the
+lower the priority. During scheduling, the chords with higher priority
+will be scanned and scheduled first.</li>
+</ul>
+<span style="font-weight: bold;">Throws: </span>hidden_chord_exception,
+too_many_ports_exception<br>
+<br>
+<span style="font-weight: bold;">template <typename PortT1, typename
+PortT2, </span><span style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">typename CallT></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">void chord_override(PortT1 &p1,
+PortT2 &p2, </span><span style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">CallT c</span><span
+ style="font-weight: bold;">, int priority=0</span><span
+ style="font-weight: bold;">)</span><span style="font-weight: bold;"></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">Effect:  </span>allow overriding
+an existing chord (the chord body is replaced with the new one).
+It is mostly used in child class to override a chord defined in parent
+class. The overridden chord is identified by the set of async<> /
+synch<> ports in its header. It uses the same arguments as
+normal chord() functions. <br>
+<span style="font-weight: bold;">Throws:  </span>chord_override_exception
+(chord not found)<br>
+<br>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename PortT1, typename PortT2,
+......></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">void chord_remove(PortT1
+&p1, PortT2 &p2, ......)</span><br style="font-weight: bold;">
+</div>
+<div style="margin-left: 40px;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">Effect:  </span>remove the chord
+identified by the set of async / synch ports. <br>
+<span style="font-weight: bold;">Throws:  </span>chord_remove_exception
+(chord not found)<br>
+<br>
+</div>
+<ol start="5">
+  <li>Factory functions to create "unnamed" joints</li>
+</ol>
+<span style="font-weight: bold;">    joint
+joins(joint::spawn_type s = 0, int hb = 0, const char *name =
+0);   <br>
+      </span>or a more general templated function:<br>
+<span style="font-weight: bold;">    template <</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">     
+template <size_t> class scheduler, </span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      size_t
+max_size</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    ></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">   
+joint_t<scheduler, max_size> joins_t(joint::spawn_type s = 0, int
+hb = 0, const char *name = 0);<br>
+    </span>Often joint is used directly to join message
+ports together, and we don't need direct reference to the "joint"
+object. In these cases, the factory functions can be used to create
+unnamed joints, such as:<br>
+<span style="font-weight: bold;">    joins(exec)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">      
+.chord(async1, async2, handler1)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">      
+.chord(async2, async3, handler2);</span><span style="font-weight: bold;"></span><br>
+<span style="font-weight: bold;"></span>
+<div style="margin-left: 40px;"><span style="font-weight: bold;"></span></div>
+<ol start="6">
+  <li>executor</li>
+</ol>
+<div style="margin-left: 40px; font-weight: bold;">template
+<template <size_t> class scheduler=sched_first_match, size_t
+sz=32><br>
+class executor :
+public Joint {<br>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">   
+executor(int
+num_threads, const char *name = NULL);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    async<task>
+execute;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">   
+synch<void,void> shutdown;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">};</span><br>
+Chords with only
+async<> ports will run its body as an asynchronous task,
+either in a newly spawned thread, or in the thread
+pool of executor, or in exisiting applications' execution services
+(such as Boost.Asio's
+main threads). So executor is the "engine" of Join based
+applications. Different execution strategies can be used, such as a
+thread per request or thread pool, as long as it provides a
+functional / functor interface to spawn a task: <span
+ style="font-weight: bold;">void
+operator()(task t). </span><br>
+Here the default executor class is thread
+pool based and defined using
+async<> / synch<> ports and chords,  and is a good
+sample
+of
+Join.<br>
+<br style="font-weight: bold;">
+<span style="font-weight: bold;">executor(int num_threads, const char
+*name = NULL)</span>: <br>
+<ul>
+  <li>int num_threads: number of worker threads in executor's thread
+pool</li>
+  <li>const char *name: name of executor, a flag to turn on/off
+debugging messages<br>
+  </li>
+</ul>
+<span style="font-weight: bold;">async<task> execute</span>: an
+asynchronous port for applications to submit tasks; never block; it
+is the default <span style="font-weight: bold;">task queue</span> of
+executor.<br>
+<span style="font-weight: bold;">synch<void()> shutdown</span>:
+a synchronous port for application code to wait for executor to
+shutdown (and its threads to exit). All chords defined with <span
+ style="font-weight: bold;">shutdown</span> have a low priority (1)
+than normal (0), so shutdown will return when all submited tasks have
+been finished and all worker threads exit.<br>
+</div>
+<ol start="7">
+  <li>support of group/bundle of async<> ports:</li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">boost::std::vector<async<</span><span
+ style="font-weight: bold;">MsgT</span><span style="font-weight: bold;">>
+><br>
+</span><span style="font-weight: bold;"></span>In Cω  the chords
+are defined with a  few async/synch ports with distinct names.
+What if
+we want to
+define chords with a group / std::vector of asynchronous message ports
+of the same message type? Inspired by
+similar abstractions in CCR[4] and C#.Joins[3], Join allows the
+definition of an std::vector of async ports and their participation
+in chord as a whole. Since there are at most one synch port per
+chord, so there is no support for std::vector of synch ports. The
+argument type for vector of async ports will be vector<MsgT>.
+Please
+refer to the Group of Asynchronous port
+tutotail (join_many.cpp) for their usage. <span
+ style="font-weight: bold;"></span></div>
+<ol start="8">
+  <li>exceptions</li>
+</ol>
+<div style="margin-left: 40px;">
+<p>In Cω , many of the following errors related to the definitions of
+async/synch ports and chords will be found by the compiler. Since
+Join is implemented as a library, these errors can only be
+reported as the following exceptions during runtime:</p>
+<p><span style="font-weight: bold;">class
+join_exception : public
+std::exception</span></p>
+<p style="margin-left: 40px;"><span style="font-weight: bold;"></span>the
+base class of all Join related exceptions</p>
+<p><span style="font-weight: bold;">class not_in_chord_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When a async / synch port is
+declared
+and no chord include it, this port has no body defined similar
+to  pure virtual port in C++ abstract classes (pure virtual ports
+have only signature / declaration, no body). When this class
+is
+instantiated and client code calls this port, not_in_chord_exception
+will be thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">class double_association_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">Every async / synch port should only
+be associated with one Joint. When attempting to use the port in the
+chords of more than one Joints, this exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">class
+queue_overflow_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">For basic flow control, async / synch
+port can be constructed with a limited queue size. When the buffered
+(unprocessed) messages exceed the queue size, this exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">class no_executor_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When an Joint contains chords with only
+async ports but without an executor associated, this exception is
+thrown.<br>
+</div>
+<p><span style="font-weight: bold;">class hidden_chord_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When Joint's scheduling policy is
+fire_as_soon_as_possible (default), if the definition of a new chord
+contains all the ports of another chord, or vice versa, this exception
+will be thrown.<br>
+</div>
+<p><span style="font-weight: bold;">class too_many_ports_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When defining a new chord, the total
+number of async / synch ports (accumulated thru all the defined
+chords) exceeds the max_size of Joint, this exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span></p>
+<p><span style="font-weight: bold;">class chord_override_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When chord_override() is called and no
+existing chord found with the same set of async / synch ports, this
+exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;">class chord_remove_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When chord_remove() is called and no
+existing chord found with the same set of async / synch ports, this
+exception is thrown.<br>
+<br>
+</div>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">class
+synch_not_1st_exception : public join_exception </span><br
+ style="font-weight: bold;">
+<p style="margin-left: 40px;">When chords are defined with synchronous
+port, however the synchronous port is not used as the first port
+of chord, this exception is thrown.<br>
+</p>
+<span style="font-weight: bold;"> class single_synch_exception : public
+join_exception </span><br>
+<p style="margin-left: 40px;">When chords are defined with more than
+one synchronous ports, this exception is thrown.<br>
+</p>
+<span style="font-weight: bold;"> class synch_time_out_exception :
+public
+join_exception </span><br>
+<p style="margin-left: 40px;">The call of a synchronous port is timed
+out.<br>
+</p>
+</div>
+<div style="margin-left: 40px;"></div>
+</body>
+</html>
Added: sandbox/join/libs/join/doc/tutorials.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/tutorials.html	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,205 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+  <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+  <title>Chords and Joints</title>
+</head>
+<body>
+<h2>Samples</h2>
+<a href="#buffer"><br>
+Thread safe buffer and message queue</a>
+<br>
+Handle multiple message flows
+<br>
+Using channels as the interface between components<br>
+Joint's life cycle<br>
+<h3><a name="buffer"></a>Thread safe buffer and message queue</h3>
+a thread safe buffer can be defined in various manners.<br>
+<br>
+the most explicit version:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename V></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer: public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<V> put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<V,void> get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    chord(get, put,
+&buffer::chord_body);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  V chord_body(void_t g, V p) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    return p;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer.cpp<br>
+<br>
+simplified version using boost::lambda or boost::phoenix and "joins()"
+factory function:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename V></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<V> put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<V,void> get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    joins().chord(get,
+put, lambda::_2);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer_lambda.cpp<br>
+buffer_phoenix.cpp<br>
+<br>
+Using boost::tuples, boost::phoenix, and "joins()" factory function, we
+can create message queue (essentially a thread safe buffer) in JoCaml
+style:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename V></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">boost::tuple<async<V>,synch<V,void>
+> create_msg_que() {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  async<V> send;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<V,void> recv;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  joins().chord(recv, send, arg2);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  return boost::make_tuple(send,
+recv);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}   <br>
+...<br>
+  async<int> send;<br>
+  synch<int, void> recv;<br>
+  boost::tie(send, recv) = create_msg_que<int>();<br>
+</span></div>
+producer_consumer.cpp<br>
+<h3><a name="flows"></a>Handle multiple message flows</h3>
+A simple demo of joint handling 3 message flows and performing
+different processing on messages when they are available in different
+channels:<br>
+<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">flows_bundle
+make_data_channels(joint::spawn_type e) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<int> flow1;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<int> flow2;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  async<int> flow3;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  joins(e)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    .chord(flow1,
+flow2, std::cout <<  (arg1 + arg2) << "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    .chord(flow2,
+flow3, std::cout <<  (arg1 - arg2) << "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    .chord(flow1,
+flow3, std::cout <<  (arg1 * arg2) << "\n");</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  return boost::make_tuple(flow1,
+flow2, flow3);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+</div>
+flows.cpp<br>
+<br>
+a sample of joining a vector of message flows into one output flow:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+<typename T></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class join_many : public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  vector<async<T> >
+inputs;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  synch<vector<T>,
+void> output;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">  join_many(int num) :
+inputs(num) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">    chord(output,
+inputs, &join_many::chord_body);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  vector<T>
+chord_body(void_t out, vector<T> in) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">    return in;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">  }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+join_many.cpp<br>
+<h3><a name="chain"></a>Using channels as the interface between
+components</h3>
+A sample of a chain of nodes, each node exposes its input and output
+interfaces as channels.<br>
+Each node reads from its input, does some transformation of data, wait
+a second and send it to output interface, all these are performed in
+executor's thread pool.<br>
+The way to connect one node (A) to the next (B) is simple: 
+A.output = B.input.<br>
+<div style="margin-left: 40px; font-weight: bold;">template
+<typename T><br>
+class node {<br>
+  int my_no;<br>
+  boost::function<T(T)> func_;<br>
+public:<br>
+  async<T> input;<br>
+  async<T> output;<br>
+  bool stop;<br>
+  node(joint::spawn_type e, int n, boost::function<T(T)> f)
+: <br>
+    my_no(n), func_(f), stop(false) {<br>
+    joins(e).chord(input, bind(&node::proc, this,
+_1));<br>
+  }<br>
+private:<br>
+  void proc(T in) {<br>
+    try {<br>
+      if(!stop) {<br>
+        T o = func_(in);<br>
+        cout << "node ["
+<< my_no << "] recv [" << in <<"] and send ["
+<< o << "]" << endl;<br>
+        thread_sleep(1);<br>
+        output(o);<br>
+      }<br>
+    } catch (join_exception &e) {<br>
+      cout << "node [" << my_no
+<< "] got exception: " << e.what() << "\n";<br>
+    }<br>
+  }<br>
+};<br>
+</div>
+chain.cpp<br>
+<br>
+<h3><span style="font-weight: bold;"><a name="lifecycle"></a>Joint's
+life cycle</span></h3>
+After the initial definition, a joint's set of chords can modified thru
+methods: chord_override(), chord_remove() or reset(). And if a joint's
+heartbeat is set during its creation, its set of chords will be
+auto-destroyed after "heartbeat" number of firings of chords.<br>
+Here is a sample demonstrating these joint's lifecycle related methods.<br>
+joint_lifetime.cpp<br>
+<br>
+</body>
+</html>
Added: sandbox/join/libs/join/examples/Jamfile.v2
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/Jamfile.v2	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,30 @@
+
+project
+  : requirements
+    <library>/boost/thread//boost_thread
+    <define>BOOST_ALL_NO_LIB=1
+    <threading>multi
+  ;
+
+obj buffer.obj : buffer.cpp ;
+exe buffer : buffer.obj ;
+obj buffer_phoenix.obj : buffer_phoenix.cpp ;
+exe buffer_phoenix : buffer_phoenix.obj ;
+obj buffer_lambda.obj : buffer_lambda.cpp ;
+exe buffer_lambda : buffer_lambda.obj ;
+obj bounded_buffer.obj : bounded_buffer.cpp ;
+exe bounded_buffer : bounded_buffer.obj ;
+obj join_many.obj : join_many.cpp ;
+exe join_many : join_many.obj ;
+obj flows.obj : flows.cpp ;
+exe flows : flows.obj ;
+obj prime_sieve.obj : prime_sieve.cpp ;
+exe prime_sieve : prime_sieve.obj ;
+obj joint_lifetime.obj : joint_lifetime.cpp ;
+exe joint_lifetime : joint_lifetime.obj ;
+obj chain.obj : chain.cpp ;
+exe chain : chain.obj ;
+obj producer_consumer.obj : producer_consumer.cpp ;
+exe producer_consumer : producer_consumer.obj ;
+obj async_call_ret.obj : async_call_ret.cpp ;
+exe async_call_ret : async_call_ret.obj ;
Added: sandbox/join/libs/join/examples/async_call_ret.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/async_call_ret.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,88 @@
+//
+// async_call_ret.cpp
+//
+// adapted from Cw sample
+//
+#include <boost/join/join.hpp>
+#include "boost/tuple/tuple.hpp"
+#include <iostream>
+
+using namespace std;
+using namespace boost;
+using namespace boost::join;
+
+logger log1("log");
+
+//server interface: expose an async ports taking requests 
+//in the form of a string name and a response ports
+class IService {
+public:
+  async<tuple<string, async<string> > > Service;
+};
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+//server class, implement the server interface:
+//for each incoming request <name, respChan>, sleep a few seconds
+//send "name is done" to respChan
+class MyService : public joint, public IService {
+  int n;
+public:
+  MyService(int m, joint::spawn_type e) : joint(e), n(m) {
+    chord(Service, &MyService::service_fun);
+  }
+  void service_fun(tuple<string, async<string> > req) {
+    string srvName = tuples::get<0>(req);
+    async<string> replyChan = tuples::get<1>(req);
+    for(int i=0; i<n; i++) {
+      log1.stream() << srvName << " does " << i << logger::endl;
+      thread_sleep((n%2)?1:2);
+    }
+    replyChan(srvName+" is done");
+  }
+};
+
+//clients use Join2 to wait for responses from 2 servers
+class Result2 : public joint {
+public:
+  async<string> first;
+  async<string> second;
+  synch<tuple<string,string>,void> Wait;
+  Result2() {
+    chord(Wait, first, second, &Result2::wait_body);
+  }
+  tuple<string,string> wait_body(void_t w, string f, string s) {
+    return make_tuple(f,s);
+  }
+};
+
+int main(int argc, char **argv) {
+  executor exec(2);
+  //create 2 server threads
+  MyService s1(5, exec.execute);
+  MyService s2(10, exec.execute);
+  //main thread will play the role of clients
+  //create a Result2 for waiting for results from 2 servers
+  Result2 j;
+  //send to 2 servers requests with response ports packed inside
+  s1.Service(make_tuple("Service 1", j.first));
+  s2.Service(make_tuple("Service 2", j.second));
+  //client idle
+  for(int i=0; i<7; i++) {
+    log1.stream() << "client idle " << i << logger::endl;
+    thread_sleep(1);
+  }
+  string x,y;
+  //client blocking here waiting for responses from servers
+  log1.stream() << "client block wait " << logger::endl;
+  tie(x,y) = j.Wait();
+  log1.stream() << "first result = " << x << ", second result = " << y << logger::endl;
+  exec.shutdown();
+  return 0;
+}
+
Added: sandbox/join/libs/join/examples/bounded_buffer.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/bounded_buffer.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,83 @@
+//
+// bounded_buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+ 
+using namespace boost::join;
+
+logger log1("log");
+
+template <typename T>
+class BufferN : public joint {
+private:
+  async<void> token;
+  async<T> value;
+
+public:
+  synch<void,T> put;
+  synch<T,void> get;
+
+  BufferN(int sz) {
+    chord(put, token, &BufferN::put_cb);
+    chord(get, value, &BufferN::get_cb);
+    for(int i=0; i<sz; i++) 
+      token();  
+  }
+  void put_cb(T put, void_t) {
+    value(put);
+  }
+  T get_cb(void_t, T val) {
+    token();
+    return val;
+  }
+};
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+void producer(synch<void,int> put) {
+  for(int i=0; i<2000000; i++) {
+    put(i);
+    log1.stream() << "producer sends [" << i << "]" << logger::endl;
+    //thread_sleep(2);
+  }
+}
+void consumer1(synch<int,void> get) {
+  for(int i=0; i<1000000; i++) {
+    log1.stream() << "consumer1 recvs [" << get() << "]" << logger::endl;      
+    //thread_sleep(3);
+  }
+}
+void consumer2(synch<int,void> get) {
+  for(int i=0; i<1000000; i++) {
+    log1.stream() << "consumer2 recvs [" << get() << "]" << logger::endl;      
+    //thread_sleep(1);
+  }
+}
+
+int main(int argc, char **argv) {
+  executor exec(3);
+
+  //create bounded buffer
+  BufferN<int> buf(5);
+  
+  //create test tasks
+  exec.spawn(boost::bind(consumer1, buf.get));
+  exec.spawn(boost::bind(consumer2, buf.get));
+  exec.spawn(boost::bind(producer, buf.put));
+
+  exec.shutdown();
+  return 0;
+}
+
Added: sandbox/join/libs/join/examples/buffer.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/buffer.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,77 @@
+//
+// buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+
+using namespace boost;
+using namespace boost::join;
+
+logger log1("log");
+
+template <typename V>
+class buffer: public joint {
+public:
+  async<V> put;
+  synch<V,void> get;
+  buffer() {
+    chord(get, put, &buffer::chord_body);
+  }
+  V chord_body(void_t g, V p) {
+    return p;
+  }
+};
+
+void hello_world(buffer<std::string> &b) {
+  b.put("hello");
+  b.put("world");
+  log1.msg(b.get()+" ");
+  log1.msg(b.get());
+}
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+void producer(async<std::string> put) {
+  std::ostringstream ostr;
+  for(int i=0; i<5; i++) {
+    ostr << i;
+    put(ostr.str());
+    log1.stream() << "producer sends [" << i << "]" << logger::endl;
+    ostr.str("");
+    thread_sleep(1);
+  }
+}
+void consumer(synch<std::string,void> get) {
+  for(int i=0; i<5; i++) {
+    log1.stream() << "consumer recvs [" << get() << "]" << logger::endl;      
+    thread_sleep(2);
+  }
+}
+
+int main(int argc, char **argv) {
+  buffer<std::string> b;
+  hello_world(b);
+
+  executor exec(2);  //spawn 2 threads for executor thread pool
+
+  //spawn test tasks
+  exec.spawn(boost::bind(consumer, b.get));
+  exec.spawn(boost::bind(producer, b.put));
+
+  exec.shutdown();
+  return 0;
+}
Added: sandbox/join/libs/join/examples/buffer_lambda.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/buffer_lambda.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,76 @@
+//
+// buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+#include <boost/lambda/lambda.hpp>
+
+using namespace boost;
+using namespace boost::join;
+using namespace boost::lambda;
+
+logger log1("log");
+
+template <typename V>
+class buffer {
+public:
+  async<V> put;
+  synch<V,void> get;
+  buffer() {
+    joins().chord(get, put, lambda::_2);
+  }
+};
+
+void hello_world(buffer<std::string> &b) {
+  b.put("hello");
+  b.put("world");
+  log1.msg(b.get()+" ");
+  log1.msg(b.get());
+}
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+void producer(async<std::string> put) {
+  std::ostringstream ostr;
+  for(int i=0; i<5; i++) {
+    ostr << i;
+    put(ostr.str());
+    log1.stream() << "producer sends [" << i << "]" << logger::endl;
+    ostr.str("");
+    thread_sleep(1);
+  }
+}
+void consumer(synch<std::string,void> get) {
+  for(int i=0; i<5; i++) {
+    log1.stream() << "consumer recvs [" << get() << "]" << logger::endl;      
+    thread_sleep(2);
+  }
+}
+
+int main(int argc, char **argv) {
+  buffer<std::string> b;
+  hello_world(b);
+
+  executor exec(2);  //spawn 2 threads for executor thread pool
+
+  //spawn test tasks
+  exec.spawn(boost::bind(consumer, b.get));
+  exec.spawn(boost::bind(producer, b.put));
+
+  exec.shutdown();
+  return 0;
+}
Added: sandbox/join/libs/join/examples/buffer_phoenix.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/buffer_phoenix.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,76 @@
+//
+// buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+#include <boost/spirit/include/phoenix1.hpp>
+
+using namespace boost;
+using namespace boost::join;
+using namespace phoenix;
+
+logger log1("log");
+
+template <typename V>
+class buffer {
+public:
+  async<V> put;
+  synch<V,void> get;
+  buffer() {
+    joins().chord(get, put, arg2);
+  }
+};
+
+void hello_world(buffer<std::string> &b) {
+  b.put("hello");
+  b.put("world");
+  log1.msg(b.get()+" ");
+  log1.msg(b.get());
+}
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+void producer(async<std::string> put) {
+  std::ostringstream ostr;
+  for(int i=0; i<5; i++) {
+    ostr << i;
+    put(ostr.str());
+    log1.stream() << "producer sends [" << i << "]" << logger::endl;
+    ostr.str("");
+    thread_sleep(1);
+  }
+}
+void consumer(synch<std::string,void> get) {
+  for(int i=0; i<5; i++) {
+    log1.stream() << "consumer recvs [" << get() << "]" << logger::endl;      
+    thread_sleep(2);
+  }
+}
+
+int main(int argc, char **argv) {
+  buffer<std::string> b;
+  hello_world(b);
+
+  executor exec(2);  //spawn 2 threads for executor thread pool
+
+  //spawn test tasks
+  exec.spawn(boost::bind(consumer, b.get));
+  exec.spawn(boost::bind(producer, b.put));
+
+  exec.shutdown();
+  return 0;
+}
Added: sandbox/join/libs/join/examples/chain.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/chain.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,96 @@
+//
+// chain.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+#include <boost/spirit/include/phoenix1.hpp>
+
+using namespace boost;
+using namespace boost::join;
+using namespace std;
+using namespace phoenix;
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+//the node in the chain:
+//read data from input, transform its value, wait a sec 
+//and send the new value to output
+//all these are running in executor thread pool
+template <typename T>
+class node {
+  int my_no;
+  boost::function<T(T)> func_;
+public:
+  async<T> input;
+  async<T> output;
+  bool stop;
+
+  node(joint::spawn_type e, int n, boost::function<T(T)> f) : 
+    my_no(n), func_(f), stop(false) {
+    joins(e).chord(input, bind(&node::proc, this, _1));
+  }
+
+private:
+  void proc(T in) {
+    try {
+      if(!stop) {
+        T o = func_(in);
+        cout << "node [" << my_no << "] recv [" << in <<"] and send [" << o << "]" << endl;
+        thread_sleep(1);
+        output(o);
+      }
+    } catch (join_exception &e) {
+      cout << "node [" << my_no << "] got exception: " << e.what() << "\n";
+    }
+  }
+};
+
+enum {
+  num_nodes = 10,
+  num_data = 5
+};
+
+int main(int, char**) {
+  executor exec(3);
+  vector<node<int>*> nodes;
+  nodes.reserve(num_nodes);
+
+  //create nodes
+  for(int i=0; i<num_nodes; i++) {
+    nodes.push_back(new node<int>(exec.execute, i, arg1+val(1)));
+  }
+  
+  //setup chaining, please note we should connect output to input
+  for(int j=0;j<num_nodes;j++) {
+    if(j<(num_nodes-1)) {
+      nodes[j]->output = nodes[j+1]->input;
+    }
+  }
+  nodes[num_nodes-1]->output = nodes[0]->input;
+
+  //pump some data into chain
+  for(int i=0;i<num_data;i++) {
+    nodes[0]->input(i);
+    thread_sleep(1);
+  }
+
+  //wait for while to see the chain-reaction
+  thread_sleep(20);
+
+  //stop the chain
+  for(int i=0; i<num_nodes; i++)
+    nodes[i]->stop = true;
+  exec.shutdown();
+  return 0;
+}
Added: sandbox/join/libs/join/examples/flows.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/flows.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,78 @@
+//
+// flows.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+#include <boost/spirit/include/phoenix1.hpp>
+#include "boost/tuple/tuple.hpp"
+
+using namespace boost;
+using namespace boost::join;
+using namespace phoenix;
+
+logger log1("log");
+
+typedef boost::tuple<async<int>, async<int>, async<int> > flows_bundle;
+
+flows_bundle make_data_channels(joint::spawn_type e) {
+  async<int> flow1;
+  async<int> flow2;
+  async<int> flow3;
+  joins_t<sched_first_match,32>(e)
+    .chord(flow1, flow2, std::cout << val('(') << arg1 << " + " << arg2 << ") = " << (arg1 + arg2) << "\n")
+    .chord(flow2, flow3, std::cout << val('(') << arg1 << " - " << arg2 << ") = " << (arg1 - arg2) << "\n")
+    .chord(flow1, flow3, std::cout << val('(') << arg1 << " * " << arg2 << ") = " << (arg1 * arg2) << "\n");
+  return boost::make_tuple(flow1, flow2, flow3);
+}
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+void producer1(async<int> chan1) {
+  for(int i=0; i<12; i++) {
+    chan1(i);
+    log1.stream() << "producer1 sends [" << i << "]" << logger::endl;
+    thread_sleep(1);
+  }
+}
+void producer2(async<int> chan2) {
+  for(int i=0; i<10; i++) {
+    chan2(i);
+    log1.stream() << "producer2 sends [" << i << "]" << logger::endl;      
+    thread_sleep(2);
+  }
+}
+void producer3(async<int> chan3) {
+  for(int i=0; i<12; i++) {
+    chan3(i);
+    log1.stream() << "producer3 sends [" << i << "]" << logger::endl;      
+    thread_sleep(1);
+  }
+}
+
+int main(int argc, char **argv) {
+  executor exec(5);
+
+  //create data channels
+  flows_bundle flows = make_data_channels(exec.execute);
+
+  //spawn test tasks
+  exec.spawn(boost::bind(producer1, tuples::get<0>(flows)));
+  exec.spawn(boost::bind(producer2, tuples::get<1>(flows)));
+  exec.spawn(boost::bind(producer3, tuples::get<2>(flows)));
+
+  thread_sleep(10);
+  exec.shutdown();
+  return 0;
+}
+
Added: sandbox/join/libs/join/examples/join_many.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/join_many.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,83 @@
+//
+// join_many.cpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+#include <vector>
+
+using namespace std;
+using namespace boost;
+using namespace boost::join;
+
+logger log1("log");
+
+// join multiple input streams into one output stream
+template <typename T>
+class join_many : public joint {
+public:
+  vector<async<T> > inputs;
+  synch<vector<T>, void> output;
+  join_many(int num) : inputs(num) {
+    chord(output, inputs, &join_many::chord_body);
+  }
+  vector<T> chord_body(void_t out, vector<T> in) {
+    return in;
+  }
+};
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+enum {
+  num_chan = 4,
+  num_test = 5
+};
+
+void collect(synch<vector<int>,void> output) {
+  vector<int> results;
+  for(int i=0; i<num_test; i++) {
+    results = output();
+    log1.stream() << "output" << i << " = " << logger::end;
+    for(size_t j=0;j<num_chan;j++) 
+      log1.stream() << results[j] << " " << logger::end;
+    log1.stream() << logger::endl;
+  }
+}
+
+void distributor(int p, async<int> input) {
+  int start = p*num_test;
+  int end = start+num_test;
+  for(int i=start; i<end; i++) {
+    log1.stream() << "inputer[" << p << "] sends [" << i << "] and wait..." << logger::endl;
+    input(i);
+    thread_sleep((p+1)%3);
+  }
+}
+
+int main(int argc, char **argv) {
+  executor exec(num_chan);  //spawn 4 threads for executor thread pool
+
+  join_many<int> merger(num_chan);
+
+  //spawn distributor tasks
+  for(int i=0; i<num_chan; i++)
+    exec.spawn(boost::bind(distributor, i, merger.inputs[i]));
+
+  //main thread wait and collect results
+  log1.msg("main thread starts waiting...");
+  collect(merger.output);
+  log1.msg("main thread finish waiting...");
+
+  exec.shutdown();
+  return 0;
+}
Added: sandbox/join/libs/join/examples/joint_lifetime.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/joint_lifetime.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,153 @@
+//
+// jocaml1.cpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <boost/bind.hpp>
+
+using namespace boost::join;
+using namespace boost;
+using namespace std;
+
+logger log1("log");
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+//define 2 plain functions as message handlers
+void apple_pie(string a, void_t) {
+  log1.stream() << a << " apple pie" << logger::endl;
+}
+
+void raspberry_pie(string r, void_t) {
+  log1.stream() << r << " raspberry pie" << logger::endl;
+}
+
+//define a class using its object's methods as message handlers
+class fruit_pie {
+  int count;
+public:
+  fruit_pie(int c) : count(c) {}
+  void apple(string a, void_t) {
+    log1.stream()  << a << " apple pie " << count << logger::endl;
+  }
+  void raspberry(string r, void_t) {
+    log1.stream()  << r << " raspberry pie " << count << logger::endl;
+  }
+  void blueberry(string r, void_t) {
+    log1.stream()  << r << " blueberry pie " << count << logger::endl;
+  }
+  void strange(string a, void_t) {
+    log1.stream()  << "STRANGE " << a << " apple pie " << count << logger::endl;
+  }
+};
+
+
+int main(int argc, char **argv) {
+  //define a few asynchronous message ports
+  async<string> apple;
+  async<string> raspberry;
+  async<string> blueberry;
+  async<void> pie;
+
+  //create an executor with 2 threads in pool
+  executor exec(2);  
+
+  //test1: a new joint obj is created, used, reused and destroyed at end of scope
+  try 
+  {
+    //define an joint/joint object with a few chords/join-patterns to specify 
+    //what to do with messages at ports : apple(), raspberry() and pie()
+    joint joins(exec.execute);
+    joins
+      .chord(apple, pie, apple_pie)
+      .chord(raspberry, pie, raspberry_pie);
+
+    //send a few messages to test
+    pie(); pie();
+    raspberry("green");
+    apple("red");
+
+    //clear all chords (join patterns or coordination definitions) in "joins" to reuse it 
+    joins.reset(); 
+
+    //redefine 2 chords with different processing code
+    fruit_pie fp1(1);
+    joins.chord(apple, pie, bind(&fruit_pie::apple, fp1, _1, _2))
+         .chord(raspberry, pie, bind(&fruit_pie::raspberry, fp1, _1, _2));
+    
+    //send more test messages
+    pie(); pie();
+    raspberry("purple");
+    apple("yellow");
+
+    //override chord <apple,pie> to print diff msg
+    joins.chord_override(apple, pie, bind(&fruit_pie::strange, fp1, _1, _2));
+    apple("white"); pie();
+
+    //add a new chord into existing chords
+    joins.chord(blueberry, pie, bind(&fruit_pie::blueberry, fp1, _1, _2));
+    blueberry("dark"); pie();
+
+    try {
+      //test unbound async methods/ports
+      //an exception should be thrown here since 
+      //all joiners/joints are gone and async port "pie" 
+      //is not associated with any joint
+      pie.detach();
+      pie(); 
+    }
+    catch (join_exception &e) {
+      log1.stream()  << "Caught unbound-pie exception: " << e.what() << logger::endl;
+    }
+
+    //must reset this joint to reuse the ports with another joint
+    joins.reset();
+  }
+  catch (join_exception &e) {
+    log1.stream()  << "Caught exception: " << e.what() << logger::endl;
+  }
+
+  //test2: another "unamed" joint object is used
+  try
+  {
+    //define an joint object with 3 heartbeats (its chords can fire 3 times, then it will self-destroy)
+    fruit_pie fp2(2);
+    joins(exec.execute, 3)
+      .chord(apple, pie, bind(&fruit_pie::apple, fp2, _1, _2))
+      .chord(raspberry, pie, bind(&fruit_pie::raspberry, fp2, _1, _2))
+      .chord(blueberry, pie, bind(&fruit_pie::blueberry, fp2, _1, _2));
+
+    //send some test messages
+    pie(); pie();
+    raspberry("sour");
+    apple("dull");
+    blueberry("sweet"); pie();
+
+    //since joint heartbeat count expires here, it should be destroyed and all ports detached,
+    //calling ports now will throw exception
+    raspberry("bitter");
+  }
+  catch (join_exception &e) {
+    log1.stream()  << "Caught bitter-raspberry exception: " << e.what() << logger::endl;
+  }
+
+  try {
+    exec.shutdown();
+  }
+  catch(join_exception &e) {
+    log1.stream()  << "Caught exception: " << e.what() << logger::endl;
+  }
+
+  return 0;
+}
+ 
Added: sandbox/join/libs/join/examples/prime_sieve.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/prime_sieve.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,80 @@
+//
+// prime_sieve.cpp
+//
+// Copyright (c) 2007 - 2009  Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/xtime.hpp>
+#include <string>
+#include <iostream>
+
+using namespace std;
+using namespace boost::join;
+
+logger log1("log");
+
+typedef rr_executor exec_type;
+
+class prime_task : public joint {
+  int my_prime_;
+  int my_no_;
+  prime_task * next_;
+  exec_type *e_;
+  //using async<> methods to represent states
+  async<void> init;
+  async<void> ready;
+public:
+  async<int> sieve;
+  void sieve_init_body(int value, void_t init) {
+    my_prime_ = value;
+    log1.stream() << "------ prime_task [" << my_no_ << "] found prime = " << my_prime_ << logger::endl;
+    next_ = new prime_task(e_, my_no_+1); //create the next task
+    ready();
+  }
+  void sieve_ready_body(int value, void_t redy) {
+    int val = value;
+    ready(); //allow processing the next incoming number as soon as possible
+    if (val % my_prime_) { //not my multiples
+      next_->sieve(val);
+    } else { //my multiples
+      log1.stream() << "prime_task [" << my_no_ << "] drop " << val << logger::endl;
+    }
+  }
+  prime_task(exec_type *e, int no) : 
+    joint(e->task_queue(no)), my_prime_(-1), my_no_(no), next_(0), e_(e) {
+    chord(sieve, init, &prime_task::sieve_init_body);
+    chord(sieve, ready, &prime_task::sieve_ready_body);
+    init(); //initialize joint's state
+  }
+  ~prime_task() {
+    if (next_ != 0) delete next_;
+  }
+};
+
+int main(int argc, char **argv) {
+  exec_type exec(4);
+
+  int max_num;
+  if (argc > 1)
+    max_num = atoi(argv[1]); 
+  else
+    max_num = 1000;
+
+  log1.stream() << "find primes in range [2-" << max_num << "]" << logger::endl;
+
+  //create prime_task0 
+  prime_task first_task(&exec,0);
+
+  //generate integer series to be sieved and feed them to the chain of tasks
+  for(int i=2; i<=max_num; i++)
+    first_task.sieve(i);
+
+  log1.msg("main thread shutdown...");
+  exec.shutdown();
+  return 0;
+}
Added: sandbox/join/libs/join/examples/producer_consumer.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/producer_consumer.cpp	2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,67 @@
+//
+// producer_consumer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+#include <boost/spirit/include/phoenix1.hpp>
+#include "boost/tuple/tuple.hpp"
+
+using namespace boost;
+using namespace boost::join;
+using namespace phoenix;
+
+logger log1("log");
+
+void thread_sleep(int sec) {
+    boost::xtime xt;
+    boost::xtime_get(&xt, boost::TIME_UTC);
+    xt.sec += sec;
+    boost::thread::sleep(xt);
+}
+
+template <typename V>
+boost::tuple<async<V>,synch<V,void> > create_msg_que() {
+  async<V> send;
+  synch<V,void> recv;
+  joins().chord(recv, send, arg2);
+  return boost::make_tuple(send, recv);
+}
+
+void producer(async<int> send) {
+  for(int i=0;i<10;i++) {
+    send(i);
+    log1.stream() << "producer sends: " << i << logger::endl;
+    thread_sleep(1);
+  }
+}
+
+void consumer(synch<int,void> recv) {
+  for(int i=0; i<10; i++) {
+    log1.stream() << "consumer recvs: " << recv() << logger::endl;
+  }
+}
+
+int main(int argc, char **argv) {
+  executor exec(2);  //spawn 2 threads for executor thread pool
+
+  //create msg que
+  async<int> send;
+  synch<int, void> recv;
+  boost::tie(send, recv) = create_msg_que<int>();
+
+  //spawn prod/consum tasks
+  exec.spawn(boost::bind(consumer, recv));
+  exec.spawn(boost::bind(producer, send));
+
+  exec.shutdown();
+  return 0;
+}