$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
From: chris_at_[hidden]
Date: 2007-11-07 23:10:14
Author: chris_kohlhoff
Date: 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
New Revision: 40919
URL: http://svn.boost.org/trac/boost/changeset/40919
Log:
Eliminate the need for an extra thread to perform timer dispatching.
Text files modified: 
   trunk/boost/asio/deadline_timer_service.hpp     |     3                                         
   trunk/boost/asio/detail/win_iocp_io_service.hpp |   197 +++++++++++++++++++++++++++++++++++++++ 
   2 files changed, 195 insertions(+), 5 deletions(-)
Modified: trunk/boost/asio/deadline_timer_service.hpp
==============================================================================
--- trunk/boost/asio/deadline_timer_service.hpp	(original)
+++ trunk/boost/asio/deadline_timer_service.hpp	2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
@@ -29,6 +29,7 @@
 #include <boost/asio/detail/kqueue_reactor.hpp>
 #include <boost/asio/detail/select_reactor.hpp>
 #include <boost/asio/detail/service_base.hpp>
+#include <boost/asio/detail/win_iocp_io_service.hpp>
 
 namespace boost {
 namespace asio {
@@ -63,7 +64,7 @@
   // The type of the platform-specific implementation.
 #if defined(BOOST_ASIO_HAS_IOCP)
   typedef detail::deadline_timer_service<
-    traits_type, detail::select_reactor<true> > service_impl_type;
+    traits_type, detail::win_iocp_io_service> service_impl_type;
 #elif defined(BOOST_ASIO_HAS_EPOLL)
   typedef detail::deadline_timer_service<
     traits_type, detail::epoll_reactor<false> > service_impl_type;
Modified: trunk/boost/asio/detail/win_iocp_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_io_service.hpp	(original)
+++ trunk/boost/asio/detail/win_iocp_io_service.hpp	2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
@@ -33,7 +33,9 @@
 #include <boost/asio/detail/handler_invoke_helpers.hpp>
 #include <boost/asio/detail/service_base.hpp>
 #include <boost/asio/detail/socket_types.hpp>
+#include <boost/asio/detail/timer_queue.hpp>
 #include <boost/asio/detail/win_iocp_operation.hpp>
+#include <boost/asio/detail/mutex.hpp>
 
 namespace boost {
 namespace asio {
@@ -52,7 +54,9 @@
       iocp_(),
       outstanding_work_(0),
       stopped_(0),
-      shutdown_(0)
+      shutdown_(0),
+      timer_thread_(0),
+      timer_interrupt_issued_(false)
   {
   }
 
@@ -94,6 +98,10 @@
       if (overlapped)
         static_cast<operation*>(overlapped)->destroy();
     }
+
+    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+      timer_queues_[i]->destroy_timers();
+    timer_queues_.clear();
   }
 
   // Register a handle with the IO completion port.
@@ -258,14 +266,96 @@
     }
   }
 
+  // Add a new timer queue to the service.
+  template <typename Time_Traits>
+  void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
+  {
+    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+    timer_queues_.push_back(&timer_queue);
+  }
+
+  // Remove a timer queue from the service.
+  template <typename Time_Traits>
+  void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
+  {
+    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+    {
+      if (timer_queues_[i] == &timer_queue)
+      {
+        timer_queues_.erase(timer_queues_.begin() + i);
+        return;
+      }
+    }
+  }
+
+  // Schedule a timer in the given timer queue to expire at the specified
+  // absolute time. The handler object will be invoked when the timer expires.
+  template <typename Time_Traits, typename Handler>
+  void schedule_timer(timer_queue<Time_Traits>& timer_queue,
+      const typename Time_Traits::time_type& time, Handler handler, void* token)
+  {
+    // If the service has been shut down we silently discard the timer.
+    if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+      return;
+
+    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+    if (timer_queue.enqueue_timer(time, handler, token))
+    {
+      if (!timer_interrupt_issued_)
+      {
+        timer_interrupt_issued_ = true;
+        lock.unlock();
+        ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+      }
+    }
+  }
+
+  // Cancel the timer associated with the given token. Returns the number of
+  // handlers that have been posted or dispatched.
+  template <typename Time_Traits>
+  std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
+  {
+    // If the service has been shut down we silently ignore the cancellation.
+    if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+      return 0;
+
+    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+    std::size_t n = timer_queue.cancel_timer(token);
+    if (n > 0 && !timer_interrupt_issued_)
+    {
+      timer_interrupt_issued_ = true;
+      lock.unlock();
+      ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+    }
+    return n;
+  }
+
 private:
   // Dequeues at most one operation from the I/O completion port, and then
   // executes it. Returns the number of operations that were dequeued (i.e.
   // either 0 or 1).
   size_t do_one(bool block, boost::system::error_code& ec)
   {
+    bool doing_timers = false;
     for (;;)
     {
+      // Try to become the timer thread.
+      if (!doing_timers)
+      {
+        doing_timers = (InterlockedCompareExchange(&timer_thread_,
+              static_cast<long>(GetCurrentThreadId()), 0) == 0);
+      }
+
+      // Calculate timeout for GetQueuedCompletionStatus call.
+      DWORD timeout = 1000;
+      if (doing_timers)
+      {
+        boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+        timer_interrupt_issued_ = false;
+        timeout = get_timeout();
+      }
+
       // Get the next operation from the queue.
       DWORD bytes_transferred = 0;
 #if (WINVER < 0x0500)
@@ -276,18 +366,45 @@
       LPOVERLAPPED overlapped = 0;
       ::SetLastError(0);
       BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
-          &completion_key, &overlapped, block ? 1000 : 0);
+          &completion_key, &overlapped, block ? timeout : 0);
       DWORD last_error = ::GetLastError();
 
+      // Dispatch any pending timers.
+      if (doing_timers)
+      {
+        boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+        for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+        {
+          timer_queues_[i]->dispatch_timers();
+          timer_queues_[i]->dispatch_cancellations();
+        }
+
+        // Clean up timers. We must not hold the lock while cleaning up timers
+        // since the destructors may make calls back into this service. We make
+        // a copy of the vector of timer queues since the original may be
+        // modified while the lock is not held.
+        timer_queues_for_cleanup_ = timer_queues_;
+        lock.unlock();
+        for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
+          timer_queues_for_cleanup_[i]->cleanup_timers();
+      }
+
       if (!ok && overlapped == 0)
       {
         if (block && last_error == WAIT_TIMEOUT)
           continue;
+
+        // Pass responsibility for timers to another thread.
+        if (doing_timers)
+        {
+          ::InterlockedExchange(&timer_thread_, 0);
+          ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+        }
+
         ec = boost::system::error_code();
         return 0;
       }
-
-      if (overlapped)
+      else if (overlapped)
       {
         // We may have been passed a last_error value in the completion_key.
         if (last_error == 0)
@@ -295,6 +412,13 @@
           last_error = completion_key;
         }
 
+        // Pass responsibility for timers to another thread.
+        if (doing_timers)
+        {
+          ::InterlockedExchange(&timer_thread_, 0);
+          ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+        }
+
         // Ensure that the io_service does not exit due to running out of work
         // while we make the upcall.
         auto_work work(*this);
@@ -306,12 +430,22 @@
         ec = boost::system::error_code();
         return 1;
       }
+      else if (completion_key == 1)
+      {
+        // Woken up to try to become the timer thread.
+      }
       else
       {
         // The stopped_ flag is always checked to ensure that any leftover
         // interrupts from a previous run invocation are ignored.
         if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
         {
+          // Pass responsibility for timers to another thread.
+          if (doing_timers)
+          {
+            ::InterlockedExchange(&timer_thread_, 0);
+          }
+
           // Wake up next thread that is blocked on GetQueuedCompletionStatus.
           if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
           {
@@ -328,6 +462,45 @@
     }
   }
 
+  // Check if all timer queues are empty.
+  bool all_timer_queues_are_empty() const
+  {
+    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+      if (!timer_queues_[i]->empty())
+        return false;
+    return true;
+  }
+
+  // Get the timeout value for the GetQueuedCompletionStatus call. The timeout
+  // value is returned as a number of milliseconds. We will wait no longer than
+  // 1000 milliseconds.
+  DWORD get_timeout()
+  {
+    if (all_timer_queues_are_empty())
+      return 1000;
+
+    boost::posix_time::time_duration minimum_wait_duration
+      = boost::posix_time::seconds(1);
+
+    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+    {
+      boost::posix_time::time_duration wait_duration
+        = timer_queues_[i]->wait_duration();
+      if (wait_duration < minimum_wait_duration)
+        minimum_wait_duration = wait_duration;
+    }
+
+    if (minimum_wait_duration > boost::posix_time::time_duration())
+    {
+      int milliseconds = minimum_wait_duration.total_milliseconds();
+      return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
+    }
+    else
+    {
+      return 0;
+    }
+  }
+
   struct auto_work
   {
     auto_work(win_iocp_io_service& io_service)
@@ -417,6 +590,22 @@
 
   // Flag to indicate whether the service has been shut down.
   long shutdown_;
+
+  // The thread that's currently in charge of dispatching timers.
+  long timer_thread_;
+
+  // Mutex for protecting access to the timer queues.
+  mutex timer_mutex_;
+
+  // Whether a thread has been interrupted to process a new timeout.
+  bool timer_interrupt_issued_;
+
+  // The timer queues.
+  std::vector<timer_queue_base*> timer_queues_;
+
+  // A copy of the timer queues, used when cleaning up timers. The copy is
+  // stored as a class data member to avoid unnecessary memory allocation.
+  std::vector<timer_queue_base*> timer_queues_for_cleanup_;
 };
 
 } // namespace detail