$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r55327 - in sandbox: boost/mapreduce boost/mapreduce/intermediates libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-07-31 17:36:01
Author: chenderson
Date: 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
New Revision: 55327
URL: http://svn.boost.org/trac/boost/changeset/55327
Log:
Added memory-based intermediate handling policy
Text files modified: 
   sandbox/boost/mapreduce/hash_partitioner.hpp         |     2 +                                       
   sandbox/boost/mapreduce/intermediates.hpp            |     7 +----                                   
   sandbox/boost/mapreduce/intermediates/local_disk.hpp |    46 ++++++++++++++++++++++++++++++++------- 
   sandbox/boost/mapreduce/job.hpp                      |    22 ++++++-------------                     
   sandbox/libs/mapreduce/test/mrtest.cpp               |    25 ++++++++++++---------                   
   5 files changed, 62 insertions(+), 40 deletions(-)
Modified: sandbox/boost/mapreduce/hash_partitioner.hpp
==============================================================================
--- sandbox/boost/mapreduce/hash_partitioner.hpp	(original)
+++ sandbox/boost/mapreduce/hash_partitioner.hpp	2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -12,6 +12,8 @@
 
 #ifndef BOOST_MAPREDUCE_HASH_PARTITONER_HPP
 #define BOOST_MAPREDUCE_HASH_PARTITONER_HPP
+
+#include <boost/functional/hash.hpp>
  
 namespace boost {
 
Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates.hpp	2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -10,9 +10,6 @@
 // For more information, see http://www.boost.org/libs/mapreduce/
 //
  
-//!!!
-#ifdef USE_IN_MEMORY_INTERMEDIATES
-#include <boost/mapreduce/intermediates/in_memory.hpp>
-#endif  // USE_IN_MEMORY_INTERMEDIATES
-
+#include <boost/mapreduce/hash_partitioner.hpp>
 #include <boost/mapreduce/intermediates/local_disk.hpp>
+#include <boost/mapreduce/intermediates/in_memory.hpp>
Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp	2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -13,10 +13,8 @@
 #ifndef BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
 #define BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
 
-#include <iomanip>                  // setw
-#include <fstream>		// linux
-#include <boost/unordered_map.hpp>
-#include <boost/mapreduce/hash_partitioner.hpp>
+#include <iomanip>      // setw
+#include <fstream>      // linux
 
 namespace boost {
 
@@ -93,8 +91,35 @@
 
 namespace intermediates {
 
+template<typename MapTask, typename ReduceTask>
+class reduce_file_output
+{
+  public:
+    reduce_file_output(std::string const &output_filespec,
+                       unsigned    const  partition,
+                       unsigned    const  num_partitions)
+    {
+        std::ostringstream filename;
+        filename << output_filespec << partition+1 << "_of_" << num_partitions;
+        filename_ = filename.str();
+        output_file_.open(filename_.c_str());
+    }
+
+    void operator()(typename MapTask::intermediate_key_type const &key,
+                    typename ReduceTask::value_type         const &value)
+    {
+        output_file_ << key << "\t" << value << "\n";
+    }
+
+  private:
+    std::string   filename_;
+    std::ofstream output_file_;
+};
+
+
 template<
     typename MapTask,
+    typename ReduceTask,
     typename SortFn=mapreduce::detail::file_sorter,
     typename MergeFn=mapreduce::detail::file_merger,
     typename PartitionFn=mapreduce::hash_partitioner>
@@ -102,11 +127,7 @@
 {
   private:
     typedef
-#ifdef _DEBUG
     std::map<
-#else
-    boost::unordered_map<
-#endif
         size_t,                                     // hash value of intermediate key (R)
         std::pair<
             std::string,                            // filename
@@ -115,6 +136,7 @@
 
   public:
     typedef MapTask map_task_type;
+    typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
 
     local_disk(unsigned const num_partitions)
       : num_partitions_(num_partitions)
@@ -141,7 +163,13 @@
                       typename map_task_type::intermediate_value_type const &value)
     {
         unsigned const partition = partitioner_(key, num_partitions_);
-        intermediates_t::iterator it = intermediate_files_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
+
+        intermediates_t::iterator it =
+            intermediate_files_.insert(
+                make_pair(
+                    partition,
+                    intermediates_t::mapped_type())).first;
+
         if (it->second.first.empty())
         {
             it->second.first = platform::get_temporary_filename();
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp	(original)
+++ sandbox/boost/mapreduce/job.hpp	2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -44,7 +44,8 @@
          typename ReduceTask,
          typename Combiner=null_combiner,
          typename Datasource=datasource::directory_iterator<MapTask>,
-         typename IntermediateStore=intermediates::local_disk<MapTask> >
+         typename IntermediateStore=intermediates::in_memory<MapTask, ReduceTask>,
+         typename StoreResult=typename IntermediateStore::store_result_type>
 class job : private boost::noncopyable
 {
   public:
@@ -99,19 +100,16 @@
       public:
         reduce_task_runner(
             std::string   const &output_filespec,
-            size_t        const  partition,
-            size_t        const  num_partitions)
+            unsigned      const  partition,
+            unsigned      const  num_partitions)
+          : store_result_(output_filespec, partition, num_partitions)
         {
-            std::ostringstream filename;
-            filename << output_filespec << partition+1 << "_of_" << num_partitions;
-            filename_ = filename.str();
-            output_file_.open(filename_.c_str());
         }
 
         void emit(typename map_task_type::intermediate_key_type const &key,
                   typename reduce_task_type::value_type         const &value)
         {
-            output_file_ << key << "\t" << value << "\n";
+            store_result_(key, value);
         }
 
         template<typename It>
@@ -120,14 +118,8 @@
             reduce_task_type::reduce(*this, key, it, ite);
         }
 
-        std::string const &filename(void) const
-        {
-            return filename_;
-        }
-
       private:
-        std::string   filename_;
-        std::ofstream output_file_;
+        StoreResult store_result_;
     };
 
     job(datasource_type &datasource, specification const &spec)
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp	(original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp	2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -9,6 +9,17 @@
 //
 // For more information, see http://www.boost.org/libs/mapreduce/
 //
+
+// configuration options
+#define WORD_COUNT_MEMORY_MAP_FILE
+#define USE_WORDCOUNT_COMBINER
+#define USE_IN_MEMORY_INTERMEDIATES
+
+#if defined(_DEBUG)
+#   define RUN_SEQUENTIAL_MAP_REDUCE
+#else
+#   define BOOST_DISABLE_ASSERTS
+#endif
  
 #if !defined(_DEBUG) &&  !defined(BOOST_DISABLE_ASSERTS)
 #   pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
@@ -26,15 +37,6 @@
 #include <crtdbg.h>
 #endif
 
-// configuration options
-#define WORD_COUNT_MEMORY_MAP_FILE
-#define USE_WORDCOUNT_COMBINER
-//#define USE_IN_MEMORY_INTERMEDIATES
-
-#if defined(_DEBUG)
-#define RUN_SEQUENTIAL_MAP_REDUCE
-#endif
-
 namespace wordcount {
 
 typedef
@@ -172,9 +174,9 @@
 #else
   , boost::mapreduce::null_combiner
 #endif
-#ifdef USE_IN_MEMORY_INTERMEDIATES
+#ifndef USE_IN_MEMORY_INTERMEDIATES
   , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
-  , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type>
+  , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
 #endif
 > job;
 
@@ -244,6 +246,7 @@
         std::cout << "\nFinished.";
 #else
         std::cout << "\nRunning CPU Parallel MapReduce...";
+        spec.reduce_tasks = 1;
 
         if (argc > 2)
             spec.map_tasks = atoi(argv[2]);