$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r55343 - in sandbox: boost/mapreduce boost/mapreduce/intermediates libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-01 10:15:07
Author: chenderson
Date: 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
New Revision: 55343
URL: http://svn.boost.org/trac/boost/changeset/55343
Log:
Refined the type definitions for key/value pair types in Map and Reduce Tasks
Text files modified: 
   sandbox/boost/mapreduce/intermediates/in_memory.hpp     |    15 ++++++++-------                         
   sandbox/boost/mapreduce/intermediates/local_disk.hpp    |    25 +++++++++++++------------               
   sandbox/boost/mapreduce/job.hpp                         |    21 ++++++++-------------                   
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp |    14 ++++++--------                          
   sandbox/libs/mapreduce/test/mrtest.cpp                  |    29 +++++++++++++++++------------           
   5 files changed, 52 insertions(+), 52 deletions(-)
Modified: sandbox/boost/mapreduce/intermediates/in_memory.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/in_memory.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates/in_memory.hpp	2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -29,8 +29,8 @@
     {
     }
 
-    void operator()(typename MapTask::intermediate_key_type const &/*key*/,
-                    typename ReduceTask::value_type         const &/*value*/)
+    void operator()(typename ReduceTask::key_type   const &/*key*/,
+                    typename ReduceTask::value_type const &/*value*/)
     {
     }
 };
@@ -46,12 +46,13 @@
     typedef
     std::vector<
         std::map<
-            typename MapTask::intermediate_key_type,
-            std::list<typename MapTask::intermediate_value_type> > >
+            typename ReduceTask::key_type,
+            std::list<typename ReduceTask::value_type> > >
     intermediates_t;
 
   public:
-    typedef MapTask map_task_type;
+    typedef MapTask    map_task_type;
+    typedef ReduceTask reduce_task_type;
     typedef reduce_null_output<MapTask, ReduceTask> store_result_type;
 
     in_memory(unsigned const num_partitions)
@@ -99,8 +100,8 @@
     }
 
 
-    bool const insert(typename map_task_type::intermediate_key_type   const &key,
-                      typename map_task_type::intermediate_value_type const &value)
+    bool const insert(typename reduce_task_type::key_type   const &key,
+                      typename reduce_task_type::value_type const &value)
     {
         typename intermediates_t::value_type &map = intermediates_[partitioner_(key, num_partitions_)];
 
Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp	2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -108,8 +108,8 @@
         output_file_.open(filename_.c_str());
     }
 
-    void operator()(typename MapTask::intermediate_key_type const &key,
-                    typename ReduceTask::value_type         const &value)
+    void operator()(typename ReduceTask::key_type   const &key,
+                    typename ReduceTask::value_type const &value)
     {
         output_file_ << key << "\t" << value << "\n";
     }
@@ -138,7 +138,8 @@
     intermediates_t;
 
   public:
-    typedef MapTask map_task_type;
+    typedef MapTask    map_task_type;
+    typedef ReduceTask reduce_task_type;
     typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
 
     local_disk(unsigned const num_partitions)
@@ -162,8 +163,8 @@
         }
     }
 
-    bool const insert(typename map_task_type::intermediate_key_type   const &key,
-                      typename map_task_type::intermediate_value_type const &value)
+    bool const insert(typename reduce_task_type::key_type   const &key,
+                      typename reduce_task_type::value_type const &value)
     {
         unsigned const partition = partitioner_(key, num_partitions_);
 
@@ -208,7 +209,7 @@
             std::ifstream infile(infilename.c_str());
             while (!infile.eof())
             {
-                typename map_task_type::intermediate_value_type value;
+                typename reduce_task_type::value_type value;
                 if (read_record(infile, key, value))
                 {
                     if (key != last_key  &&  key.length() > 0)
@@ -276,10 +277,10 @@
     template<typename Callback>
     void reduce(unsigned const partition, Callback &callback, results &result)
     {
-        typename map_task_type::intermediate_key_type   key;
-        typename map_task_type::intermediate_key_type   last_key;
-        typename map_task_type::intermediate_value_type value;
-        std::list<typename map_task_type::intermediate_value_type> values;
+        typename reduce_task_type::key_type   key;
+        typename reduce_task_type::key_type   last_key;
+        typename reduce_task_type::value_type value;
+        std::list<typename reduce_task_type::value_type> values;
 
         std::list<std::string> filenames;
         intermediates_t::const_iterator it = intermediate_files_.find(partition);
@@ -316,8 +317,8 @@
 
   protected:
     static bool const read_record(std::ifstream &infile,
-                                  typename map_task_type::intermediate_key_type   &key,
-                                  typename map_task_type::intermediate_value_type &value)
+                                  typename reduce_task_type::key_type   &key,
+                                  typename reduce_task_type::value_type &value)
     {
 #if defined(__SGI_STL_PORT)
         size_t keylen;
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp	(original)
+++ sandbox/boost/mapreduce/job.hpp	2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -19,24 +19,19 @@
 
 template<typename T> size_t length(T const &str);
 
-template<
-    typename MapKey,
-    typename MapValue,
-    typename IntermediateKey,
-    typename IntermediateValue>
+template<typename MapKey, typename MapValue>
 class map_task
 {
   public:
     typedef MapKey              key_type;
     typedef MapValue            value_type;             
-    typedef IntermediateKey     intermediate_key_type;
-    typedef IntermediateValue   intermediate_value_type;
 };
 
-template<typename ReduceValue>
+template<typename ReduceKey, typename ReduceValue>
 class reduce_task
 {
   public:
+    typedef ReduceKey   key_type;
     typedef ReduceValue value_type;
 };
 
@@ -78,8 +73,8 @@
             return *this;
         }
 
-        bool const emit_intermediate(typename map_task_type::intermediate_key_type   const &key,
-                                     typename map_task_type::intermediate_value_type const &value)
+        bool const emit_intermediate(typename reduce_task_type::key_type   const &key,
+                                     typename reduce_task_type::value_type const &value)
         {
             return intermediate_store_.insert(key, value);
         }
@@ -105,14 +100,14 @@
         {
         }
 
-        void emit(typename map_task_type::intermediate_key_type const &key,
-                  typename reduce_task_type::value_type         const &value)
+        void emit(typename reduce_task_type::key_type   const &key,
+                  typename reduce_task_type::value_type const &value)
         {
             store_result_(key, value);
         }
 
         template<typename It>
-        void operator()(typename map_task_type::intermediate_key_type const &key, It it, It ite)
+        void operator()(typename reduce_task_type::key_type const &key, It it, It ite)
         {
             reduce_task_type::reduce(*this, key, it, ite);
         }
Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp	(original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp	2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -29,10 +29,8 @@
 namespace wordcount {
 
 struct map_task : public boost::mapreduce::map_task<
-                             std::string,                           // MapKey
-                             std::pair<char const *, char const *>, // MapValue
-                             std::string,                           // IntermediateKey
-                             unsigned>                              // IntermediateValue
+                             std::string,                            // MapKey
+                             std::pair<char const *, char const *> > // MapValue
 {
     template<typename Runtime>
     static void map(Runtime &runtime, std::string const &/*key*/, value_type &value)
@@ -79,7 +77,7 @@
     }
 };
 
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
 {
     template<typename Runtime, typename It>
     static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -98,19 +96,19 @@
         intermediate_store.combine(instance);
     }
 
-    void start(map_task::intermediate_key_type const &)
+    void start(reduce_task::key_type const &)
     {
         total_ = 0;
     }
 
     template<typename IntermediateStore>
-    void finish(map_task::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+    void finish(reduce_task::key_type const &key, IntermediateStore &intermediate_store)
     {
         if (total_ > 0)
             intermediate_store.insert(key, total_);
     }
 
-    void operator()(map_task::intermediate_value_type const &value)
+    void operator()(reduce_task::value_type const &value)
     {
         total_ += value;
     }
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp	(original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp	2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -50,18 +50,14 @@
 map_value_type;
 
 template<typename T>
-struct map_task : public boost::mapreduce::map_task<
-                             std::string,            // MapKey
-                             map_value_type,         // MapValue
-                             std::string,            // IntermediateKey
-                             unsigned>               // IntermediateValue
+struct map_task
+  : public boost::mapreduce::map_task<std::string, map_value_type>
 {
     template<typename Runtime>
     static void map(Runtime &runtime, std::string const &key, T &value);
 };
-typedef map_task<map_value_type> map_task_type;
 
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
 {
     template<typename Runtime, typename It>
     static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -167,6 +163,8 @@
 
 class combiner;
 
+typedef map_task<map_value_type> map_task_type;
+
 typedef
 boost::mapreduce::job<
     wordcount::map_task_type
@@ -197,19 +195,19 @@
         intermediate_store.combine(instance);
     }
 
-    void start(job::map_task_type::intermediate_key_type const &)
+    void start(job::reduce_task_type::key_type const &)
     {
         total_ = 0;
     }
 
     template<typename IntermediateStore>
-    void finish(job::map_task_type::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+    void finish(job::reduce_task_type::key_type const &key, IntermediateStore &intermediate_store)
     {
         if (total_ > 0)
             intermediate_store.insert(key, total_);
     }
 
-    void operator()(job::map_task_type::intermediate_value_type const &value)
+    void operator()(job::reduce_task_type::value_type const &value)
     {
         total_ += value;
     }
@@ -256,7 +254,7 @@
         spec.map_tasks = 1;
         spec.reduce_tasks = 1;
 
-        wordcount::job      job(datasource, spec);
+        wordcount::job job(datasource, spec);
         job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
         std::cout << "\nFinished.";
 #else
@@ -266,7 +264,12 @@
         if (argc > 2)
             spec.map_tasks = atoi(argv[2]);
 
-        boost::mapreduce::run<wordcount::job>(spec, result);
+        // this method can be called, but since we want access to the result data,
+        // we need to have a job object to interrogate
+        //boost::mapreduce::run<wordcount::job>(spec, result);
+
+        wordcount::job job(datasource, spec);
+        job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
         std::cout << "\nCPU Parallel MapReduce Finished.";
 #endif
     }
@@ -275,6 +278,8 @@
         std::cout << std::endl << "Error: " << e.what();
     }
 
+    typedef std::pair<wordcount::reduce_task::key_type, wordcount::reduce_task::value_type> keyvalue_t;
+
     std::cout << std::endl << "\n" << "MapReduce statistics:";
     std::cout << "\n  " << "MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
     std::cout << "\n  " << "  Map phase runtime                       : " << result.map_runtime << " seconds";