$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r55510 - sandbox/libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-10 15:28:10
Author: chenderson
Date: 2009-08-10 15:28:09 EDT (Mon, 10 Aug 2009)
New Revision: 55510
URL: http://svn.boost.org/trac/boost/changeset/55510
Log:
Multi-test addition
Text files modified: 
   sandbox/libs/mapreduce/test/mrtest.cpp |   271 +++++++++++++++++++++------------------ 
   1 files changed, 149 insertions(+), 122 deletions(-)
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp	(original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp	2009-08-10 15:28:09 EDT (Mon, 10 Aug 2009)
@@ -12,9 +12,6 @@
 
 // configuration options
 #define WORD_COUNT_MEMORY_MAP_FILE
-#define USE_WORDCOUNT_COMBINER
-#define USE_IN_MEMORY_INTERMEDIATES
-//#define WRITE_OUTPUT_FILES
 
 #if defined(_DEBUG)
 #   if 0
@@ -161,33 +158,7 @@
     }
 }
 
-
-class combiner;
-
-typedef map_task<map_value_type> map_task_type;
-
-typedef
-boost::mapreduce::job<
-    wordcount::map_task_type
-  , wordcount::reduce_task
-#ifdef USE_WORDCOUNT_COMBINER
-  , wordcount::combiner
-#else
-  , boost::mapreduce::null_combiner
-#endif
-  , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
-#ifdef USE_IN_MEMORY_INTERMEDIATES
-  , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type, wordcount::reduce_task>
-#else
-  , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
-#endif
-#if defined(USE_IN_MEMORY_INTERMEDIATES)  &&  defined(WRITE_OUTPUT_FILES)
-  , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task>
-#endif
-> job;
-
-
-
+template<typename ReduceTask>
 class combiner
 {
   public:
@@ -198,19 +169,19 @@
         intermediate_store.combine(instance);
     }
 
-    void start(job::reduce_task_type::key_type const &)
+    void start(typename ReduceTask::key_type const &)
     {
         total_ = 0;
     }
 
     template<typename IntermediateStore>
-    void finish(job::reduce_task_type::key_type const &key, IntermediateStore &intermediate_store)
+    void finish(typename ReduceTask::key_type const &key, IntermediateStore &intermediate_store)
     {
         if (total_ > 0)
             intermediate_store.insert(key, total_);
     }
 
-    void operator()(job::reduce_task_type::value_type const &value)
+    void operator()(typename ReduceTask::value_type const &value)
     {
         total_ += value;
     }
@@ -222,9 +193,122 @@
     unsigned total_;
 };
 
+typedef map_task<map_value_type> map_task_type;
+
 }   // namespace wordcount
 
 
+template<typename Job>
+void run_test(boost::mapreduce::specification spec)
+{
+    boost::mapreduce::results result;
+
+    std::cout << "\n" << typeid(Job).name() << "\n";
+
+    try
+    {
+#ifdef RUN_SEQUENTIAL_MAP_REDUCE
+        std::cout << "\nRunning Sequential MapReduce...";
+
+        spec.map_tasks = 1;
+        spec.reduce_tasks = 1;
+
+        Job job(datasource, spec);
+        job.run<boost::mapreduce::schedule_policy::sequential<Job> >(result);
+        std::cout << "\nSequential MapReduce Finished.";
+#else
+        std::cout << "\nRunning CPU Parallel MapReduce...";
+
+        // this method can be called, but since we want access to the result data,
+        // we need to have a job object to interrogate
+        //boost::mapreduce::run<Job>(spec, result);
+
+        typename Job::datasource_type datasource(spec);
+        Job job(datasource, spec);
+        job.run<boost::mapreduce::schedule_policy::cpu_parallel<Job> >(result);
+        std::cout << "\nCPU Parallel MapReduce Finished.\n";
+#endif
+
+        std::cout << "\nMapReduce statistics:";
+        std::cout << "\n  MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
+        std::cout << "\n    Map phase runtime                       : " << result.map_runtime << " seconds";
+        std::cout << "\n    Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
+        std::cout << "\n\n  Map:";
+        std::cout << "\n    Total Map keys                          : " << result.counters.map_keys_executed;
+        std::cout << "\n    Map keys processed                      : " << result.counters.map_keys_completed;
+        std::cout << "\n    Map key processing errors               : " << result.counters.map_key_errors;
+        std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
+        std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+        std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+        std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
+
+        std::cout << "\n\n  Reduce:";
+        std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
+        std::cout << "\n    Reduce keys processed                   : " << result.counters.reduce_keys_completed;
+        std::cout << "\n    Reduce key processing errors            : " << result.counters.reduce_key_errors;
+        std::cout << "\n    Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+        std::cout << "\n    Number of Result Files                  : " << result.counters.num_result_files;
+        if (result.reduce_times.size() > 0)
+        {
+            std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+            std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+            std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
+        }
+
+        typename Job::const_result_iterator it  = job.begin_results();
+        typename Job::const_result_iterator ite = job.end_results();
+        if (it != ite)
+        {
+            typedef std::list<typename Job::keyvalue_t> frequencies_t;
+            frequencies_t frequencies;
+            frequencies.push_back(*it);
+            frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
+            for (++it; it!=ite; ++it)
+            {
+                if (frequencies.size() < 10)    // show top 10
+                {
+                    frequencies.push_back(*it);
+                    if (it->second < it_smallest->second)
+                        it_smallest = frequencies.rbegin();
+                }
+                else if (it->second > it_smallest->second)
+                {
+                    *it_smallest = *it;
+                    it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<typename Job::keyvalue_t>);
+                }
+            }
+
+            frequencies.sort(boost::mapreduce::detail::greater_2nd<typename Job::keyvalue_t>);
+            std::cout << "\n\nMapReduce results:";
+            for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
+                std::cout << "\n" << freq->first << "\t" << freq->second;
+        }
+    }
+    catch (std::exception &e)
+    {
+        std::cout << std::endl << "Error running MapReduce: " << e.what();
+    }
+}
+
+//typedef
+//boost::mapreduce::job<
+//    wordcount::map_task_type
+//  , wordcount::reduce_task
+//#ifdef USE_WORDCOUNT_COMBINER
+//  , wordcount::combiner<wordcount::reduce_task>
+//#else
+//  , boost::mapreduce::null_combiner
+//#endif
+//  , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+//#ifdef USE_IN_MEMORY_INTERMEDIATES
+//  , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type, wordcount::reduce_task>
+//#else
+//  , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
+//#endif
+//#if defined(USE_IN_MEMORY_INTERMEDIATES)  &&  defined(WRITE_OUTPUT_FILES)
+//  , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task>
+//#endif
+//> job;
 
 int main(int argc, char **argv)
 {
@@ -241,27 +325,8 @@
     }
 
     boost::mapreduce::specification spec;
-    boost::mapreduce::results       result;
 
     spec.input_directory = argv[1];
-    wordcount::job::datasource_type datasource(spec);
-
-    std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
-    std::cout << "\n" << typeid(wordcount::job).name() << "\n";
-
-#ifdef RUN_SEQUENTIAL_MAP_REDUCE
-    std::cout << "\nRunning Sequential MapReduce...";
-
-    spec.map_tasks = 1;
-    spec.reduce_tasks = 1;
-
-    wordcount::job job(datasource, spec);
-    try
-    {
-        job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
-        std::cout << "\nSequential MapReduce Finished.";
-#else
-    std::cout << "\nRunning CPU Parallel MapReduce...";
 
     if (argc > 2)
         spec.map_tasks = atoi(argv[2]);
@@ -271,76 +336,38 @@
     else
         spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
 
-    // this method can be called, but since we want access to the result data,
-    // we need to have a job object to interrogate
-    //boost::mapreduce::run<wordcount::job>(spec, result);
-
-    wordcount::job job(datasource, spec);
-    try
-    {
-        job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
-        std::cout << "\nCPU Parallel MapReduce Finished.\n";
-#endif
-    }
-    catch (std::exception &e)
-    {
-        std::cout << std::endl << "Error running MapReduce: " << e.what();
-    }
-
-    std::cout << "\nMapReduce statistics:";
-    std::cout << "\n  MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
-    std::cout << "\n    Map phase runtime                       : " << result.map_runtime << " seconds";
-    std::cout << "\n    Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
-    std::cout << "\n\n  Map:";
-    std::cout << "\n    Total Map keys                          : " << result.counters.map_keys_executed;
-    std::cout << "\n    Map keys processed                      : " << result.counters.map_keys_completed;
-    std::cout << "\n    Map key processing errors               : " << result.counters.map_key_errors;
-    std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
-    std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
-
-    std::cout << "\n\n  Reduce:";
-    std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
-    std::cout << "\n    Reduce keys processed                   : " << result.counters.reduce_keys_completed;
-    std::cout << "\n    Reduce key processing errors            : " << result.counters.reduce_key_errors;
-    std::cout << "\n    Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
-    std::cout << "\n    Number of Result Files                  : " << result.counters.num_result_files;
-    if (result.reduce_times.size() > 0)
-    {
-        std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
-    }
-
-    wordcount::job::const_result_iterator it  = job.begin_results();
-    wordcount::job::const_result_iterator ite = job.end_results();
-    if (it != ite)
-    {
-        typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
-        frequencies_t frequencies;
-        frequencies.push_back(*it);
-        frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
-        for (++it; it!=ite; ++it)
-        {
-            if (frequencies.size() < 10)    // show top 10
-            {
-                frequencies.push_back(*it);
-                if (it->second < it_smallest->second)
-                    it_smallest = frequencies.rbegin();
-            }
-            else if (it->second > it_smallest->second)
-            {
-                *it_smallest = *it;
-                it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
-            }
-        }
-
-        frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
-        std::cout << "\n\nMapReduce results:";
-        for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
-            std::cout << "\n" << freq->first << "\t" << freq->second;
-    }
+    std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
+    run_test<
+        boost::mapreduce::job<
+            wordcount::map_task_type
+          , wordcount::reduce_task>
+        >(spec);
+
+    run_test<
+        boost::mapreduce::job<
+            wordcount::map_task_type
+          , wordcount::reduce_task
+          , wordcount::combiner<wordcount::reduce_task> >
+        >(spec);
+
+    run_test<
+        boost::mapreduce::job<
+            wordcount::map_task_type
+          , wordcount::reduce_task
+          , wordcount::combiner<wordcount::reduce_task>
+          , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+          , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type, wordcount::reduce_task>
+          , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task> >
+        >(spec);
+
+    run_test<
+        boost::mapreduce::job<
+            wordcount::map_task_type
+          , wordcount::reduce_task
+          , wordcount::combiner<wordcount::reduce_task>
+          , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+          , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task> >
+        >(spec);
 
     return 0;
 }