$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r64210 - in sandbox/SOC/2010/process: boost/process boost/process/detail libs/process/example
From: boris_at_[hidden]
Date: 2010-07-20 20:37:11
Author: bschaeling
Date: 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
New Revision: 64210
URL: http://svn.boost.org/trac/boost/changeset/64210
Log:
Added support for asynchronous waiting on POSIX platforms
Text files modified: 
   sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp |   130 ++++++++++++++++++++++++--------------- 
   sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp        |     5 -                                       
   sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp          |    91 +++++++++++++++++++--------             
   sandbox/SOC/2010/process/boost/process/operations.hpp                  |    26 ++++---                                 
   sandbox/SOC/2010/process/boost/process/process.hpp                     |     4                                         
   sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp      |     8 -                                       
   sandbox/SOC/2010/process/libs/process/example/wait_child.cpp           |     6 -                                       
   7 files changed, 164 insertions(+), 106 deletions(-)
Modified: sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp	(original)
+++ sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -34,6 +34,8 @@
 #include <algorithm>
 
 #if defined(BOOST_POSIX_API)
+#   include <sys/types.h>
+#   include <sys/wait.h>
 #elif defined(BOOST_WINDOWS_API)
 #   include <windows.h>
 #else
@@ -49,17 +51,18 @@
     : public boost::asio::detail::service_base<StatusImplementation>
 {
 public:
-
-
-    typedef boost::shared_ptr<StatusImplementation> implementation_type;
-
     explicit basic_status_service(boost::asio::io_service &io_service)
         : boost::asio::detail::service_base<StatusImplementation>(io_service),
-        run_(true),
         work_thread_(&basic_status_service<StatusImplementation>::work_thread, this)
+#if defined(BOOST_POSIX_API)
+        , interrupt_pid_(-1),
+        pids_(0)
+#elif defined(BOOST_WINDOWS_API)
+        , run_(true)
+#endif
     {
 #if defined(BOOST_WINDOWS_API)
-        handles_.push_back(::CreateEvent(NULL, FALSE, FALSE, NULL));
+        handles_.push_back(CreateEvent(NULL, FALSE, FALSE, NULL));
         if (handles_[0] == NULL)
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("CreateEvent() failed");
 #endif
@@ -70,14 +73,11 @@
         stop_work_thread();
         work_thread_.join();
 #if defined(BOOST_WINDOWS_API)
-        if (!::CloseHandle(handles_[0]))
-            BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("CloseHandle() failed");
-#elif defined(BOOST_POSIX_API)
-        if (::close(handles_[0]) == -1)
-            BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("close() failed");
+        CloseHandle(handles_[0]);
 #endif
     }
 
+    typedef boost::shared_ptr<StatusImplementation> implementation_type;
 
     void construct(implementation_type &impl)
     {
@@ -88,43 +88,45 @@
 
     void destroy(implementation_type &impl)
     {
-        // impl->destroy();
+        boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
+        std::vector<implementation_type>::iterator it = std::find(impls_.begin(), impls_.end(), impl);
+        if (it != impls_.end())
+            impls_.erase(it);
         impl.reset();
-        // remove impl from impls_
     }
 
     int wait(implementation_type &impl, pid_type pid)
     {
-        /*
         boost::system::error_code ec;
-        int exit_code = impl->wait(pid, ec);
+        int status = impl.wait(pid, ec);
         boost::asio::detail::throw_error(ec);
-        */
-        int exit_code = 0;
-        return exit_code;
+        return status;
     }
 
     template <typename Handler>
     void async_wait(implementation_type &impl, pid_type pid, Handler handler)
     {
-#if defined(BOOST_WINDOWS_API)
-        HANDLE handle = ::OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, FALSE, pid);
+#if defined(BOOST_POSIX_API)
+        boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
+        if (!work_)
+            work_.reset(new boost::asio::io_service::work(this->get_io_service()));
+        ++pids_;
+        impl->async_wait(pid, this->get_io_service().wrap(handler));
+#elif defined(BOOST_WINDOWS_API)
+        HANDLE handle = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, FALSE, pid);
         if (handle == NULL)
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("OpenProcess() failed");
-#elif defined(BOOST_POSIX_API)
-        behavior::stream::native_type handle = pid;
-#endif
         boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
-        interrupt_work_thread();
-        work_thread_cond_.wait(work_thread_mutex_);
         if (!work_)
             work_.reset(new boost::asio::io_service::work(this->get_io_service()));
+        interrupt_work_thread();
+        work_thread_cond_.wait(work_thread_mutex_);
         handles_.push_back(handle);
         impl->async_wait(handle, this->get_io_service().wrap(handler));
         work_thread_cond_.notify_all();
+#endif
     }
 
-
 private:
     void shutdown_service()
     {
@@ -132,10 +134,31 @@
 
     void work_thread()
     {
-        while (running())
+        for (;;)
         {
-#if defined(BOOST_WINDOWS_API)
-            DWORD res = ::WaitForMultipleObjects(handles_.size(), &handles_[0], FALSE, INFINITE);
+#if defined(BOOST_POSIX_API)
+            int status;
+            pid_t pid = wait(&status);
+            if (pid == -1)
+            {
+                if (errno != EINTR)
+                    BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("wait(2) failed");
+            }
+            else if (interrupted(pid))
+            {
+                // On POSIX the only reason to interrupt is to break out.
+                break;
+            }
+            else
+            {
+                boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
+                for (std::vector<implementation_type>::iterator it = impls_.begin(); it != impls_.end(); ++it)
+                    (*it)->complete(pid, status);
+                if (--pids_ == 0)
+                    work_.reset();
+            }
+#elif defined(BOOST_WINDOWS_API)
+            DWORD res = WaitForMultipleObjects(handles_.size(), &handles_[0], FALSE, INFINITE);
             if (res == WAIT_FAILED)
                 BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("WaitForMultipleObjects() failed");
             else if (res - WAIT_OBJECT_0 == 0)
@@ -150,7 +173,7 @@
             {
                 HANDLE handle = handles_[res - WAIT_OBJECT_0];
                 DWORD exit_code;
-                if (!::GetExitCodeProcess(handle, &exit_code))
+                if (!GetExitCodeProcess(handle, &exit_code))
                     BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("GetExitCodeProcess() failed");
                 boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
                 for (std::vector<implementation_type>::iterator it = impls_.begin(); it != impls_.end(); ++it)
@@ -161,53 +184,58 @@
                 if (handles_.size() == 1)
                     work_.reset();
             }
-#elif defined(BOOST_POSIX_API)
-            //linux here
 #endif
         }
     }
 
-    bool running()
-    {
-        // Access to run_ is sychronized with stop_work_thread().
-        boost::mutex::scoped_lock lock(work_thread_mutex_);
-        return run_;
-    }
-
     void interrupt_work_thread()
     {
+#if defined(BOOST_POSIX_API)
+        // By creating a child process which immediately exits we interrupt wait().
+        interrupt_pid_ = create_child("/usr/sh").get_id();
+#elif defined(BOOST_WINDOWS_API)
         // By signaling the event in the first slot WaitForMultipleObjects() will return.
         // The work thread won't do anything except checking if it should continue to run.
-#if defined(BOOST_WINDOWS_API)
-        if (!::SetEvent(handles_[0]))
+        if (!SetEvent(handles_[0]))
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("SetEvent() failed");
-#elif defined(BOOST_POSIX_API)
-            //linux here
 #endif
     }
 
+#if defined(BOOST_POSIX_API)
+    bool interrupted(pid_t pid)
+    {
+        boost::mutex::scoped_lock lock(work_thread_mutex_);
+        return interrupt_pid_ == pid;
+    }
+#endif
+
     void stop_work_thread()
     {
-        // Access to run_ is sychronized with running().
         boost::mutex::scoped_lock lock(work_thread_mutex_);
+#if defined(BOOST_WINDOWS_API)
+        // Access to run_ must be sychronized with running().
         run_ = false;
+#endif
+        // Access to interrupt_pid_ must be sychronized with interrupted().
         interrupt_work_thread();
     }
 
     boost::scoped_ptr<boost::asio::io_service::work> work_;
     std::vector<implementation_type> impls_;
     boost::mutex work_thread_mutex_;
-    boost::condition_variable_any work_thread_cond_;
-    bool run_;
     boost::thread work_thread_;
-    std::vector<behavior::stream::native_type> handles_;
-
-
-
+#if defined(BOOST_POSIX_API)
+    pid_t interrupt_pid_;
+    int pids_;
+#elif defined(BOOST_WINDOWS_API)
+    bool run_;
+    boost::condition_variable_any work_thread_cond_;
+    std::vector<HANDLE> handles_;
+#endif
 };
 
 }
 }
 }
 
-#endif 
+#endif
Modified: sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp	(original)
+++ sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -99,9 +99,4 @@
 }
 }
 
-void posix_remap(int native_handle, int new_handle)
-{
-    if (::dup2(new_handle, native_handle) == -1)
-        BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed"); 
-}
 #endif
Modified: sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp	(original)
+++ sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -22,11 +22,11 @@
 #include <boost/process/config.hpp>
 #include <boost/system/error_code.hpp>
 #include <boost/ptr_container/ptr_unordered_map.hpp>
-#include <boost/bind.hpp>
-#include <boost/process/stream_behavior.hpp>
 #include <algorithm>
 
 #if defined(BOOST_POSIX_API)
+#   include <sys/types.h>
+#   include <sys/wait.h>
 #elif defined(BOOST_WINDOWS_API)
 #   include <windows.h>
 #else
@@ -37,13 +37,15 @@
 namespace process {
 namespace detail {
 
+#elif defined(BOOST_POSIX_API)
+typedef pid_t phandle;
+#elif defined(BOOST_WINDOWS_API)
+typedef HANDLE phandle;
+#endif
+
 struct operation
 {
-    #if defined(BOOST_WINDOWS_API)
-    virtual void operator()(DWORD exit_code)
-    #elif defined BOOST_POSIX_API
-    virtual void operator()(unsigned int exit_code)
-    #endif
+    virtual void operator()(int exit_code)
     {
     }
 };
@@ -57,11 +59,7 @@
     {
     }
 
-    #if defined(BOOST_WINDOWS_API)
-    void operator()(DWORD exit_code)
-    #elif defined BOOST_POSIX_API
-    void operator()(unsigned int exit_code)
-    #endif
+    void operator()(int exit_code)
     {
         handler_(boost::system::error_code(), exit_code);
     }
@@ -73,33 +71,72 @@
 class status_impl
 {
 public:
+    int wait(pid_type pid, boost::system::error_code ec)
+    {
+#if defined(BOOST_POSIX_API)
+        pid_t p;
+        int status;
+        do
+        {
+            p = waitpid(pid, &status, 0);
+        } while (p == -1 && errno == EINTR);
+        if (p == -1)
+        {
+            ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "waitpid(2) failed");
+            return -1;
+        }
+        return status;
+#elif defined(BOOST_WINDOWS_API)
+        HANDLE h = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, FALSE, pid);
+        if (h == NULL)
+        {
+            ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "OpenProcess() failed");
+            return -1;
+        }
+
+        if (WaitForSingleObject(h, INFINITE) == WAIT_FAILED)
+        {
+            CloseHandle(h);
+            ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "WaitForSingleObject() failed");
+            return -1;
+        }
+
+        DWORD exit_code;
+        if (!GetExitCodeProcess(h, &exit_code))
+        {
+            CloseHandle(h);
+            ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "GetExitCodeProcess() failed");
+            return -1;
+        }
+        if (!CloseHandle(h))
+        {
+            ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "CloseHandle() failed");
+            return -1;
+        }
+        return exit_code;
+#endif
+    }
+
     template <typename Handler>
-    void async_wait(behavior::stream::native_type handle, Handler handler)
+    void async_wait(phandle ph, Handler handler)
     {
-        ops_.insert(handle, new wrapped_handler<Handler>(handler));
+        ops_.insert(ph, new wrapped_handler<Handler>(handler));
     }
 
-    #if defined(BOOST_WINDOWS_API)
-    void complete(behavior::stream::native_type handle, DWORD exit_code)
-    #elif defined(BOOST_POSIX_API)
-    void complete(behavior::stream::native_type handle, unsigned int exit_code)
-    #endif
+    void complete(phandle ph, int exit_code)
     {
-        boost::iterator_range<operations_type::iterator> r = ops_.equal_range(handle);
+        boost::iterator_range<operations_type::iterator> r = ops_.equal_range(ph);
         for (operations_type::iterator it = r.begin(); it != r.end(); ++it)
             (*it->second)(exit_code);
         ops_.erase(r.begin(), r.end());
-        #if defined(BOOST_WINDOWS_API)
-        if (!::CloseHandle(handle))
+#if defined(BOOST_WINDOWS_API)
+        if (!CloseHandle(ph))
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("CloseHandle() failed");
-        #elif defined(BOOST_POSIX_API)
-        if (::close(handle) == -1)
-            BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("close() failed");
-        #endif
+#endif
     }
 
 private:
-    typedef boost::ptr_unordered_multimap<behavior::stream::native_type, operation> operations_type;
+    typedef boost::ptr_unordered_multimap<phandle, operation> operations_type;
     operations_type ops_;
 };
 
Modified: sandbox/SOC/2010/process/boost/process/operations.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/operations.hpp	(original)
+++ sandbox/SOC/2010/process/boost/process/operations.hpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -200,29 +200,33 @@
 
             int stdin_fd = ctx.stdin_behavior->get_child_end();
             if (stdin_fd != -1 && stdin_fd < maxdescs)
-                closeflags[stdin_fd] = false;
+            {
+                if (::dup2(stdin_fd, STDIN_FILENO) == -1)
+                    BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
+                closeflags[STDIN_FILENO] = false;
+            }
 
             int stdout_fd = ctx.stdout_behavior->get_child_end();
             if (stdout_fd != -1 && stdout_fd < maxdescs)
-                closeflags[stdout_fd] = false;
+            {
+                if (::dup2(stdout_fd, STDOUT_FILENO) == -1)
+                    BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
+                closeflags[STDOUT_FILENO] = false;
+            }
 
             int stderr_fd = ctx.stderr_behavior->get_child_end();
             if (stderr_fd != -1 && stderr_fd < maxdescs)
-                closeflags[stderr_fd] = false;
+            {
+                if (::dup2(stderr_fd, STDERR_FILENO) == -1)
+                    BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
+                closeflags[STDERR_FILENO] = false;
+            }
 
             for (int i = 0; i < maxdescs; ++i)
             {
                 if (closeflags[i])
                     ::close(i);
             }
-
-            if(closeflags[stdin_fd] == false)
-                posix_remap(STDIN_FILENO, stdin_fd);
-            if(closeflags[stdout_fd] == false)
-                posix_remap(STDOUT_FILENO, stdout_fd);
-            if(closeflags[stderr_fd] == false)
-                posix_remap(STDERR_FILENO, stderr_fd);
-
         }
         catch (const boost::system::system_error &e)
         {
Modified: sandbox/SOC/2010/process/boost/process/process.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/process.hpp	(original)
+++ sandbox/SOC/2010/process/boost/process/process.hpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -137,8 +137,7 @@
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("waitpid(2) failed"); 
         return s;
 #elif defined(BOOST_WINDOWS_API)
-
-        HANDLE h = ::OpenProcess(PROCESS_QUERY_INFORMATION |SYNCHRONIZE    , FALSE, id_); 
+        HANDLE h = ::OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, FALSE, id_); 
         if (h == NULL) 
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("OpenProcess() failed"); 
 
@@ -149,7 +148,6 @@
         } 
 
         DWORD exit_code; 
-
         if (!::GetExitCodeProcess(h, &exit_code)) 
         { 
             ::CloseHandle(h); 
Modified: sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp
==============================================================================
--- sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp	(original)
+++ sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -18,13 +18,11 @@
 
 int main() 
 { 
-
     std::string exe = find_executable_in_path("hostname"); 
-	context ctx; 
-	ctx.stdout_behavior = behavior::pipe::def(behavior::pipe::stream_type::output_stream); 
-
+    context ctx; 
+    ctx.stdout_behavior = behavior::pipe::def(behavior::pipe::stream_type::output_stream); 
     child c = create_child(exe,ctx); 
     pistream &is = c.get_stdout(); 
     std::cout << is.rdbuf(); 
-	c.wait();
+    c.wait();
 } 
Modified: sandbox/SOC/2010/process/libs/process/example/wait_child.cpp
==============================================================================
--- sandbox/SOC/2010/process/libs/process/example/wait_child.cpp	(original)
+++ sandbox/SOC/2010/process/libs/process/example/wait_child.cpp	2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -20,8 +20,6 @@
 { 
     std::string exe = find_executable_in_path("hostname"); 
     child c = create_child(exe); 
-
-	int exit_c = c.wait(); 
-	std::cout << exit_c << std::endl; 
-	
+    int exit_code = c.wait(); 
+    std::cout << exit_code << std::endl; 
 }