$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r55453 - in sandbox: boost boost/mapreduce boost/mapreduce/schedule_policy libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-07 17:55:02
Author: chenderson
Date: 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
New Revision: 55453
URL: http://svn.boost.org/trac/boost/changeset/55453
Log:
Sub-second timing of map/reduce tasks
Text files modified: 
   sandbox/boost/mapreduce.hpp                              |    18 ++++++------------                      
   sandbox/boost/mapreduce/job.hpp                          |    20 +++++++++++---------                    
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp |    11 ++++++-----                             
   sandbox/boost/mapreduce/schedule_policy/sequential.hpp   |    10 ++++++----                              
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp  |     4 ++--                                    
   sandbox/libs/mapreduce/test/mrtest.cpp                   |     4 ++--                                    
   6 files changed, 33 insertions(+), 34 deletions(-)
Modified: sandbox/boost/mapreduce.hpp
==============================================================================
--- sandbox/boost/mapreduce.hpp	(original)
+++ sandbox/boost/mapreduce.hpp	2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -24,6 +24,7 @@
 #include <boost/config.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/cstdint.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 
 namespace boost {
 
@@ -79,18 +80,11 @@
         }
     } counters;
 
-    results()
-      : job_runtime(0),
-        map_runtime(0),
-        reduce_runtime(0)
-    {
-    }
-
-    time_t                job_runtime;
-    time_t                map_runtime;
-    time_t                reduce_runtime;
-    std::vector<time_t>   map_times;
-    std::vector<time_t>   reduce_times;
+    boost::posix_time::time_duration                job_runtime;
+    boost::posix_time::time_duration                map_runtime;
+    boost::posix_time::time_duration                reduce_runtime;
+    std::vector<boost::posix_time::time_duration>   map_times;
+    std::vector<boost::posix_time::time_duration>   reduce_times;
 };
 
 }   // namespace mapreduce
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp	(original)
+++ sandbox/boost/mapreduce/job.hpp	2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -12,7 +12,7 @@
 
 #ifndef BOOST_MAPREDUCE_JOB_HPP
 #define BOOST_MAPREDUCE_JOB_HPP
- 
+
 namespace boost {
 
 namespace mapreduce {
@@ -174,17 +174,19 @@
     template<typename SchedulePolicy>
     void run(SchedulePolicy &schedule, results &result)
     {
-        time_t const start_time = time(NULL);
+        using namespace boost::posix_time;
+        ptime start_time(microsec_clock::universal_time());
         schedule(*this, result);
-        result.job_runtime = time(NULL) - start_time;
+        result.job_runtime = microsec_clock::universal_time() - start_time;
     }
 
     template<typename Sync>
     bool const run_map_task(void *key, results &result, Sync &sync)
     {
-        bool success = true;
-        time_t const start_time = time(NULL);
+        using namespace boost::posix_time;
+        ptime start_time(microsec_clock::universal_time());
 
+        bool success = true;
         try
         {
             std::auto_ptr<typename map_task_type::key_type>
@@ -226,7 +228,7 @@
             ++result.counters.map_key_errors;
             success = false;
         }
-        result.map_times.push_back(time(NULL)-start_time);
+        result.map_times.push_back(microsec_clock::universal_time() - start_time);
 
         return success;
     }
@@ -243,8 +245,8 @@
 
     bool const run_reduce_task(unsigned const partition, results &result)
     {
-        time_t const start_time = time(NULL);
-
+        using namespace boost::posix_time;
+        ptime start_time(microsec_clock::universal_time());
         try
         {
             reduce_task_runner runner(
@@ -261,7 +263,7 @@
             ++result.counters.reduce_key_errors;
         }
         
-        result.reduce_times.push_back(time(NULL)-start_time);
+        result.reduce_times.push_back(microsec_clock::universal_time()-start_time);
 
         return true;
     }
Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp	(original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp	2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -87,11 +87,12 @@
         boost::mutex  m1, m2;
 
         // run the Map Tasks
-        time_t start_time = time(NULL);
-        boost::thread_group map_threads;
+        using namespace boost::posix_time;
+        ptime start_time(microsec_clock::universal_time());
 
         unsigned const map_tasks = std::max(num_cpus,std::min(num_cpus, job.number_of_map_tasks()));
 
+        boost::thread_group map_threads;
         for (unsigned loop=0; loop<map_tasks; ++loop)
         {
             boost::shared_ptr<results> this_result(new results);
@@ -107,10 +108,10 @@
             map_threads.add_thread(thread);
         }
         map_threads.join_all();
-        result.map_runtime = time(NULL) - start_time;
+        result.map_runtime = microsec_clock::universal_time() - start_time;
 
         // run the Reduce Tasks
-        start_time = time(NULL);
+        start_time = microsec_clock::universal_time();
         boost::thread_group reduce_threads;
 
         unsigned const reduce_tasks =
@@ -132,7 +133,7 @@
             reduce_threads.add_thread(thread);
         }
         reduce_threads.join_all();
-        result.reduce_runtime = time(NULL) - start_time;
+        result.reduce_runtime = microsec_clock::universal_time() - start_time;
 
         // we're done with the map/reduce job, collate the statistics before returning
         for (all_results_t::const_iterator it=all_results.begin();
Modified: sandbox/boost/mapreduce/schedule_policy/sequential.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/sequential.hpp	(original)
+++ sandbox/boost/mapreduce/schedule_policy/sequential.hpp	2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -36,19 +36,21 @@
   public:
     void operator()(Job &job, results &result)
     {
+        using namespace boost::posix_time;
+        ptime start_time(microsec_clock::universal_time());
+
         // Map Tasks
-        time_t start_time = time(NULL);
         void *key = 0;
         detail::null_lock nolock;
         while (job.get_next_map_key(key)  &&  job.run_map_task(key, result, nolock))
             ;
-        result.map_runtime = time(NULL) - start_time;
+        result.map_runtime = microsec_clock::universal_time() - start_time;
 
         // Reduce Tasks
-        start_time = time(NULL);
+        start_time(microsec_clock::universal_time());
         for (unsigned partition=0; partition<job.number_of_partitions(); ++partition)
             job.run_reduce_task(partition, result);
-        result.reduce_runtime = time(NULL) - start_time;
+        result.reduce_runtime = microsec_clock::universal_time() - start_time;
 
         result.counters.actual_map_tasks    = 1;
         result.counters.actual_reduce_tasks = 1;
Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp	(original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp	2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -174,7 +174,7 @@
         std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
         std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
         std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-        std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+        std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
 
         std::cout << "\n\n  Reduce:";
         std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
@@ -186,7 +186,7 @@
         {
             std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
             std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-            std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+            std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
         }
 
         wordcount::job::const_result_iterator it  = job.begin_results();
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp	(original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp	2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -298,7 +298,7 @@
     std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
     std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
     std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+    std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
 
     std::cout << "\n\n  Reduce:";
     std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
@@ -310,7 +310,7 @@
     {
         std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
         std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+        std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
     }
 
     wordcount::job::const_result_iterator it  = job.begin_results();