$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r55451 - in sandbox: boost boost/mapreduce boost/mapreduce/intermediates boost/mapreduce/schedule_policy libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-07 16:19:46
Author: chenderson
Date: 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
New Revision: 55451
URL: http://svn.boost.org/trac/boost/changeset/55451
Log:
Added iteration over results
Other minor changes and optimisations
Text files modified: 
   sandbox/boost/mapreduce.hpp                              |     2                                         
   sandbox/boost/mapreduce/datasource.hpp                   |     7 -                                       
   sandbox/boost/mapreduce/intermediates.hpp                |     2                                         
   sandbox/boost/mapreduce/intermediates/in_memory.hpp      |   134 ++++++++++++++++++++++++++++++++-       
   sandbox/boost/mapreduce/intermediates/local_disk.hpp     |   157 ++++++++++++++++++++++++++++++++++----- 
   sandbox/boost/mapreduce/job.hpp                          |    53 ++++++++++--                            
   sandbox/boost/mapreduce/mergesort.hpp                    |     6 +                                       
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp |    22 ++++-                                   
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp  |    47 ++++++-----                             
   sandbox/libs/mapreduce/test/mrtest.cpp                   |   122 ++++++++++++++++++++----------          
   10 files changed, 439 insertions(+), 113 deletions(-)
Modified: sandbox/boost/mapreduce.hpp
==============================================================================
--- sandbox/boost/mapreduce.hpp	(original)
+++ sandbox/boost/mapreduce.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -39,7 +39,7 @@
 
     specification()
       : map_tasks(0),                   
-        reduce_tasks(10),
+        reduce_tasks(1),
         max_file_segment_size(1048576L),    // default 1Gb               
         output_filespec("mapreduce_")   
     {
Modified: sandbox/boost/mapreduce/datasource.hpp
==============================================================================
--- sandbox/boost/mapreduce/datasource.hpp	(original)
+++ sandbox/boost/mapreduce/datasource.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -126,16 +126,13 @@
     if (it->second->offset == it->second->size)
         data_->current_file.clear();
 
-    ///!!! parameterise finding a word boundary
-    char ch = std::toupper(*value.second);
-    while ((ch == '\'' || (ch >= 'A' && ch <= 'Z'))  &&  it->second->offset != it->second->size)
+    // break on a line boundary
+    while (*value.second != '\n'  &&  *value.second != '\r'  &&  it->second->offset != it->second->size)
     {
         ++value.second;
         ++it->second->offset;
-        ch = std::toupper(*value.second);
     }
 
-//std::cout<<"\nget_data(): next offset will be " << it->second->offset;
     return true;
 }
 
Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -11,5 +11,5 @@
 //
  
 #include <boost/mapreduce/hash_partitioner.hpp>
-#include <boost/mapreduce/intermediates/local_disk.hpp>
 #include <boost/mapreduce/intermediates/in_memory.hpp>
+#include <boost/mapreduce/intermediates/local_disk.hpp>
Modified: sandbox/boost/mapreduce/intermediates/in_memory.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/in_memory.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates/in_memory.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -13,6 +13,8 @@
 #ifndef BOOST_MAPREDUCE_IN_MEMORY_INTERMEDIATES_HPP
 #define BOOST_MAPREDUCE_IN_MEMORY_INTERMEDIATES_HPP
 
+#include <boost/iterator/iterator_facade.hpp>
+
 namespace boost {
 
 namespace mapreduce {
@@ -23,6 +25,9 @@
 class reduce_null_output
 {
   public:
+    reduce_null_output()
+    { }
+
     reduce_null_output(std::string const &/*output_filespec*/,
                        unsigned    const  /*partition*/,
                        unsigned    const  /*num_partitions*/)
@@ -54,21 +59,131 @@
     typedef MapTask    map_task_type;
     typedef ReduceTask reduce_task_type;
     typedef reduce_null_output<MapTask, ReduceTask> store_result_type;
+    typedef
+    std::pair<
+        typename reduce_task_type::key_type,
+        typename reduce_task_type::value_type>
+    keyvalue_t;
+
+    class const_result_iterator
+      : public boost::iterator_facade<
+            const_result_iterator,
+            keyvalue_t const,
+            boost::forward_traversal_tag>
+    {
+        friend class boost::iterator_core_access;
+
+      protected:
+        explicit const_result_iterator(in_memory const *outer)
+          : outer_(outer)
+        {
+            BOOST_ASSERT(outer_);
+            iterators_.resize(outer_->num_partitions_);
+        }
+
+        void increment(void)
+        {
+            if (iterators_[index_] != outer_->intermediates_[index_].end())
+                ++iterators_[index_];
+            set_current();
+        }
+
+        bool const equal(const_result_iterator const &other) const
+        {
+            return value_ == other.value_;
+        }
+
+        const_result_iterator &begin(void)
+        {
+            for (unsigned loop=0; loop<outer_->num_partitions_; ++loop)
+                iterators_[loop] = outer_->intermediates_[loop].begin();
+            set_current();
+            return *this;
+        }
+
+        const_result_iterator &end(void)
+        {
+            index_ = 0;
+            value_ = keyvalue_t();
+            iterators_.clear();
+            return *this;
+        }
+
+        keyvalue_t const &dereference(void) const
+        {
+            return value_;
+        }
+
+        void set_current(void)
+        {
+            index_   = 0;
+            current_ = iterators_.begin();
+            for (; index_<outer_->num_partitions_  &&  iterators_[index_] == outer_->intermediates_[index_].end(); ++index_)
+                ++current_;
+            
+            for (unsigned loop=index_+1; loop<outer_->num_partitions_; ++loop)
+            {
+                if (iterators_[loop] != outer_->intermediates_[loop].end()  &&  **current_ > *iterators_[loop])
+                {
+                    index_ = loop;
+                    current_ = iterators_.begin()+loop;
+                }
+            }
+
+            if (index_ == outer_->num_partitions_)
+                end();
+            else
+            {
+                BOOST_ASSERT((*current_)->second.size() == 1);
+                value_ = std::make_pair((*current_)->first, *(*current_)->second.begin());
+            }
+        }
+
+      private:
+        typedef
+        std::vector<typename intermediates_t::value_type::const_iterator>
+        iterators_t;
+
+        in_memory                     const *outer_;        // parent container
+        iterators_t                          iterators_;    // iterator group
+        unsigned                             index_;        // index of current element
+        keyvalue_t                           value_;        // value of current element
+        typename iterators_t::const_iterator current_;      // iterator of current element
+
+        friend class in_memory;
+    };
+    friend class const_result_iterator;
 
-    in_memory(unsigned const num_partitions)
+    in_memory(unsigned const num_partitions=1)
       : num_partitions_(num_partitions)
     {
         intermediates_.resize(num_partitions_);
     }
 
+    const_result_iterator begin_results(void) const
+    {
+        return const_result_iterator(this).begin();
+    }
+
+    const_result_iterator end_results(void) const
+    {
+        return const_result_iterator(this).end();
+    }
+
+    void swap(in_memory &other)
+    {
+        std::swap(intermediates_, other.intermediates_);
+    }
+
     template<typename Callback>
-    void reduce(unsigned const partition, Callback &callback, results &result)
+    void reduce(unsigned const partition, Callback &callback)
     {
-        typename intermediates_t::value_type &map = intermediates_[partition];
+        typename intermediates_t::value_type map;
+        std::swap(map, intermediates_[partition]);
+
         for (typename intermediates_t::value_type::const_iterator it1=map.begin(); it1!=map.end(); ++it1)
         {
             callback(it1->first, it1->second.begin(), it1->second.end());
-            ++result.counters.reduce_keys_executed;
         }
     }
 
@@ -99,11 +214,20 @@
         other.intermediates_.clear();
     }
 
+    template<typename StoreResult>
+    bool const insert(typename reduce_task_type::key_type   const &key,
+                      typename reduce_task_type::value_type const &value,
+                      StoreResult &store_result)
+    {
+        store_result(key, value);
+        return insert(key, value);
+    }
 
     bool const insert(typename reduce_task_type::key_type   const &key,
                       typename reduce_task_type::value_type const &value)
     {
-        typename intermediates_t::value_type &map = intermediates_[partitioner_(key, num_partitions_)];
+        unsigned const partition = (num_partitions_ == 1)? 0 : partitioner_(key, num_partitions_);
+        typename intermediates_t::value_type &map = intermediates_[partition];
 
         map.insert(
             make_pair(
Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp	(original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -141,6 +141,98 @@
     typedef MapTask    map_task_type;
     typedef ReduceTask reduce_task_type;
     typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
+    typedef
+    std::pair<
+        typename reduce_task_type::key_type,
+        typename reduce_task_type::value_type>
+    keyvalue_t;
+
+    class const_result_iterator
+      : public boost::iterator_facade<
+            const_result_iterator,
+            keyvalue_t const,
+            boost::forward_traversal_tag>
+    {
+        friend class boost::iterator_core_access;
+
+      protected:
+        explicit const_result_iterator(local_disk const *outer)
+          : outer_(outer)
+        {
+            BOOST_ASSERT(outer_);
+            kvlist_.resize(outer_->num_partitions_);
+        }
+
+        void increment(void)
+        {
+            if (!kvlist_[index_].first->eof())
+                read_record(*kvlist_[index_].first, kvlist_[index_].second.first, kvlist_[index_].second.second);
+            set_current();
+        }
+
+        bool const equal(const_result_iterator const &other) const
+        {
+            return (kvlist_.size() == 0  &&  other.kvlist_.size() == 0)
+               ||  (kvlist_.size() > 0
+               &&  other.kvlist_.size() > 0
+               &&  kvlist_[index_].second == other.kvlist_[index_].second);
+        }
+
+        const_result_iterator &begin(void)
+        {
+            for (unsigned loop=0; loop<outer_->num_partitions_; ++loop)
+            {
+                kvlist_[loop] = std::make_pair(boost::shared_ptr<std::ifstream>(new std::ifstream), keyvalue_t());
+                kvlist_[loop].first->open(outer_->intermediate_files_.find(loop)->second.first.c_str());
+                BOOST_ASSERT(kvlist_[loop].first->is_open());
+                read_record(*kvlist_[loop].first, kvlist_[loop].second.first, kvlist_[loop].second.second);
+            }
+            set_current();
+            return *this;
+        }
+
+        const_result_iterator &end(void)
+        {
+            index_ = 0;
+            kvlist_.clear();
+            return *this;
+        }
+
+        keyvalue_t const &dereference(void) const
+        {
+            return kvlist_[index_].second;
+        }
+
+        void set_current(void)
+        {
+            index_ = 0;
+            while (index_<outer_->num_partitions_  &&  kvlist_[index_].first->eof())
+                 ++index_;
+            
+            for (unsigned loop=index_+1; loop<outer_->num_partitions_; ++loop)
+            {
+                if (!kvlist_[loop].first->eof()  &&  !kvlist_[index_].first->eof()  &&  kvlist_[index_].second > kvlist_[loop].second)
+                    index_ = loop;
+            }
+
+            if (index_ == outer_->num_partitions_)
+                end();
+        }
+
+      private:
+        local_disk                    const *outer_;        // parent container
+        unsigned                             index_;        // index of current element
+        typedef
+        std::vector<
+            std::pair<
+                boost::shared_ptr<std::ifstream>,
+                keyvalue_t> >
+        kvlist_t;
+        kvlist_t kvlist_;
+
+        friend class local_disk;
+    };
+    friend class const_result_iterator;
 
     local_disk(unsigned const num_partitions)
       : num_partitions_(num_partitions)
@@ -163,6 +255,25 @@
         }
     }
 
+    const_result_iterator begin_results(void) const
+    {
+        return const_result_iterator(this).begin();
+    }
+
+    const_result_iterator end_results(void) const
+    {
+        return const_result_iterator(this).end();
+    }
+
+    template<typename StoreResult>
+    bool const insert(typename reduce_task_type::key_type   const &key,
+                      typename reduce_task_type::value_type const &value,
+                      StoreResult &store_result)
+    {
+        store_result(key, value);
+        return insert(key, value);
+    }
+
     bool const insert(typename reduce_task_type::key_type   const &key,
                       typename reduce_task_type::value_type const &value)
     {
@@ -206,25 +317,22 @@
             std::swap(infilename, outfilename);
 
             std::string key, last_key;
+            typename reduce_task_type::value_type value;
             std::ifstream infile(infilename.c_str());
-            while (!infile.eof())
+            while (read_record(infile, key, value))
             {
-                typename reduce_task_type::value_type value;
-                if (read_record(infile, key, value))
+                if (key != last_key  &&  key.length() > 0)
                 {
-                    if (key != last_key  &&  key.length() > 0)
+                    if (last_key.length() > 0)
+                        fn_obj.finish(last_key, *this);
+                    if (key.length() > 0)
                     {
-                        if (last_key.length() > 0)
-                            fn_obj.finish(last_key, *this);
-                        if (key.length() > 0)
-                        {
-                            fn_obj.start(key);
-                            std::swap(key, last_key);
-                        }
+                        fn_obj.start(key);
+                        std::swap(key, last_key);
                     }
-
-                    fn_obj(value);
                 }
+
+                fn_obj(value);
             }
 
             if (last_key.length() > 0)
@@ -275,18 +383,19 @@
     }
 
     template<typename Callback>
-    void reduce(unsigned const partition, Callback &callback, results &result)
+    void reduce(unsigned const partition, Callback &callback)
     {
+        intermediates_t::iterator it = intermediate_files_.find(partition);
+        BOOST_ASSERT(it != intermediate_files_.end());
+
+        std::string filename;
+        std::swap(filename, it->second.first);
+        intermediate_files_.erase(it);
+
         typename reduce_task_type::key_type   key;
         typename reduce_task_type::key_type   last_key;
         typename reduce_task_type::value_type value;
         std::list<typename reduce_task_type::value_type> values;
-
-        std::list<std::string> filenames;
-        intermediates_t::const_iterator it = intermediate_files_.find(partition);
-        BOOST_ASSERT(it != intermediate_files_.end());
-
-        std::string const &filename = it->second.first;
         std::ifstream infile(filename.c_str());
         while (read_record(infile, key, value))
         {
@@ -294,7 +403,6 @@
             {
                 if (length(last_key) > 0)
                 {
-                    ++result.counters.reduce_keys_executed;
                     callback(last_key, values.begin(), values.end());
                     values.clear();
                 }
@@ -307,12 +415,13 @@
 
         if (length(last_key) > 0)
         {
-            ++result.counters.reduce_keys_executed;
             callback(last_key, values.begin(), values.end());
         }
 
         infile.close();
         boost::filesystem::remove(filename.c_str());
+
+        intermediate_files_.find(partition)->second.second->close();
     }
 
   protected:
@@ -365,11 +474,15 @@
     void close_files(void)
     {
         for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
+        {
             if (it->second.second  &&  it->second.second->is_open())
                 it->second.second->close();
+        }
     }
 
   private:
+    typedef enum { map_phase, reduce_phase } phase_t;
+
     unsigned const  num_partitions_;
     intermediates_t intermediate_files_;
     PartitionFn     partitioner_;
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp	(original)
+++ sandbox/boost/mapreduce/job.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -49,7 +49,14 @@
     typedef Datasource              datasource_type;
     typedef IntermediateStore       intermediate_store_type;
     typedef Combiner                combiner_type;
-    typedef std::list<std::string>  filenames_t;
+
+    typedef
+    typename intermediate_store_type::const_result_iterator
+    const_result_iterator;
+
+    typedef
+    typename intermediate_store_type::keyvalue_t
+    keyvalue_t;
 
   public:
     class map_task_runner : boost::noncopyable
@@ -93,27 +100,42 @@
     {
       public:
         reduce_task_runner(
-            std::string   const &output_filespec,
-            unsigned      const  partition,
-            unsigned      const  num_partitions)
-          : store_result_(output_filespec, partition, num_partitions)
+            std::string       const &output_filespec,
+            unsigned          const &partition,
+            unsigned          const  num_partitions,
+            intermediate_store_type &intermediate_store,
+            results                 &result)
+          : partition_(partition),
+            result_(result),
+            intermediate_store_(intermediate_store),
+            store_result_(output_filespec, partition, num_partitions)
+        {
+        }
+
+        void reduce(void)
         {
+            intermediate_store_.reduce(partition_, *this);
         }
 
         void emit(typename reduce_task_type::key_type   const &key,
                   typename reduce_task_type::value_type const &value)
         {
-            store_result_(key, value);
+            intermediate_store_.insert(key, value, store_result_);
         }
 
         template<typename It>
         void operator()(typename reduce_task_type::key_type const &key, It it, It ite)
         {
+            ++result_.counters.reduce_keys_executed;
             reduce_task_type::reduce(*this, key, it, ite);
+            ++result_.counters.reduce_keys_completed;
         }
 
       private:
-        StoreResult store_result_;
+        unsigned const          &partition_;
+        results                 &result_;
+        intermediate_store_type &intermediate_store_;
+        StoreResult              store_result_;
     };
 
     job(datasource_type &datasource, specification const &spec)
@@ -123,6 +145,16 @@
      {
      }
 
+    const_result_iterator begin_results(void) const
+    {
+        return intermediate_store_.begin_results();
+    }
+
+    const_result_iterator end_results(void) const
+    {
+        return intermediate_store_.end_results();
+    }
+
     bool const get_next_map_key(void *&key)
     {
         std::auto_ptr<typename map_task_type::key_type> next_key(new typename map_task_type::key_type);
@@ -218,9 +250,10 @@
             reduce_task_runner runner(
                 specification_.output_filespec,
                 partition,
-                number_of_partitions());
-
-            intermediate_store_.reduce(partition, runner, result);
+                number_of_partitions(),
+                intermediate_store_,
+                result);
+            runner.reduce();
         }
         catch (std::exception &e)
         {
Modified: sandbox/boost/mapreduce/mergesort.hpp
==============================================================================
--- sandbox/boost/mapreduce/mergesort.hpp	(original)
+++ sandbox/boost/mapreduce/mergesort.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -42,6 +42,12 @@
     return first.second < second.second;
 }
 
+template<typename T>
+bool const greater_2nd(T const &first, T const &second)
+{
+    return first.second > second.second;
+}
+
 struct key_offset_compare
 {
   public:
Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp	(original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -24,7 +24,7 @@
 namespace detail {
 
 template<typename Job>
-inline void run_next_map_task(Job &job, results &result, boost::mutex &m1, boost::mutex &m2)
+inline void run_next_map_task(Job &job, boost::mutex &m1, boost::mutex &m2, results &result)
 {
     try
     {
@@ -48,11 +48,22 @@
 }
 
 template<typename Job>
-inline void run_next_reduce_task(Job &job, unsigned &partition, results &result)
+inline void run_next_reduce_task(Job &job, unsigned &partition, boost::mutex &mutex, results &result)
 {
     try
     {
-        job.run_reduce_task(partition, result);
+        while (1)
+        {
+            boost::mutex::scoped_lock l(mutex);
+            unsigned  part = partition++;
+            if (part < job.number_of_partitions())
+            {
+                l.unlock();
+                job.run_reduce_task(part, result);
+            }
+            else
+                break;
+        }
     }
     catch (std::exception &e)
     {
@@ -90,9 +101,9 @@
                 new boost::thread(
                     detail::run_next_map_task<Job>,
                     boost::ref(job),
-                    boost::ref(*this_result),
                     boost::ref(m1),
-                    boost::ref(m2));
+                    boost::ref(m2),
+                    boost::ref(*this_result));
             map_threads.add_thread(thread);
         }
         map_threads.join_all();
@@ -116,6 +127,7 @@
                     detail::run_next_reduce_task<Job>,
                     boost::ref(job),
                     boost::ref(partition),
+                    boost::ref(m1),
                     boost::ref(*this_result));
             reduce_threads.add_thread(thread);
         }
Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp	(original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -152,6 +152,11 @@
         if (argc > 2)
             spec.map_tasks = atoi(argv[2]);
 
+        if (argc > 3)
+            spec.reduce_tasks = atoi(argv[3]);
+        else
+            spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
+
         std::cout << "\nRunning CPU Parallel MapReduce...";
         boost::mapreduce::run<wordcount::job>(spec, result);
         std::cout << "\nCPU Parallel MapReduce Finished.";
@@ -161,30 +166,30 @@
         std::cout << std::endl << "Error: " << e.what();
     }
 
-    std::cout << std::endl << "\n" << "MapReduce statistics:";
-    std::cout << "\n  " << "MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
-    std::cout << "\n  " << "  Map phase runtime                       : " << result.map_runtime << " seconds";
-    std::cout << "\n  " << "  Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
-    std::cout << "\n\n  " << "Map:";
-    std::cout << "\n    " << "Total Map keys                          : " << result.counters.map_keys_executed;
-    std::cout << "\n    " << "Map keys processed                      : " << result.counters.map_keys_completed;
-    std::cout << "\n    " << "Map key processing errors               : " << result.counters.map_key_errors;
-    std::cout << "\n    " << "Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
-    std::cout << "\n    " << "Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    " << "Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    " << "Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+    std::cout << std::endl << "\nMapReduce statistics:";
+    std::cout << "\n  MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
+    std::cout << "\n    Map phase runtime                       : " << result.map_runtime << " seconds";
+    std::cout << "\n    Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
+    std::cout << "\n\n  Map:";
+    std::cout << "\n    Total Map keys                          : " << result.counters.map_keys_executed;
+    std::cout << "\n    Map keys processed                      : " << result.counters.map_keys_completed;
+    std::cout << "\n    Map key processing errors               : " << result.counters.map_key_errors;
+    std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
+    std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+    std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+    std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
 
-    std::cout << "\n\n  " << "Reduce:";
-    std::cout << "\n    " << "Total Reduce keys                       : " << result.counters.reduce_keys_executed;
-    std::cout << "\n    " << "Reduce keys processed                   : " << result.counters.reduce_keys_completed;
-    std::cout << "\n    " << "Reduce key processing errors            : " << result.counters.reduce_key_errors;
-    std::cout << "\n    " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
-    std::cout << "\n    " << "Number of Result Files                  : " << result.counters.num_result_files;
+    std::cout << "\n\n  Reduce:";
+    std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
+    std::cout << "\n    Reduce keys processed                   : " << result.counters.reduce_keys_completed;
+    std::cout << "\n    Reduce key processing errors            : " << result.counters.reduce_key_errors;
+    std::cout << "\n    Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+    std::cout << "\n    Number of Result Files                  : " << result.counters.num_result_files;
     if (result.reduce_times.size() > 0)
     {
-        std::cout << "\n    " << "Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    " << "Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    " << "Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+        std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+        std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+        std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
     }
 
     return 0;
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp	(original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp	2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -14,13 +14,14 @@
 #define WORD_COUNT_MEMORY_MAP_FILE
 #define USE_WORDCOUNT_COMBINER
 #define USE_IN_MEMORY_INTERMEDIATES
+//#define WRITE_OUTPUT_FILES
 
 #if defined(_DEBUG)
 #   if 0
 #       define RUN_SEQUENTIAL_MAP_REDUCE
 #   endif
 #else
-#   define BOOST_DISABLE_ASSERTS
+//#   define BOOST_DISABLE_ASSERTS
 #endif
  
 #if !defined(_DEBUG) &&  !defined(BOOST_DISABLE_ASSERTS)
@@ -180,7 +181,9 @@
 #else
   , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
 #endif
+#if defined(USE_IN_MEMORY_INTERMEDIATES)  &&  defined(WRITE_OUTPUT_FILES)
   , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task>
+#endif
 > job;
 
 
@@ -230,7 +233,7 @@
     _CrtSetDbgFlag(_CrtSetDbgFlag(_CRTDBG_REPORT_FLAG) | _CRTDBG_LEAK_CHECK_DF);
 #endif
 
-    std::cout << "MapReduce Wordcount Application";
+    std::cout << "MapReduce test program";
     if (argc < 2)
     {
         std::cerr << "Usage: wordcount directory [num_map_tasks]\n";
@@ -246,64 +249,97 @@
     std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
     std::cout << "\n" << typeid(wordcount::job).name() << "\n";
 
-    try
-    {
 #ifdef RUN_SEQUENTIAL_MAP_REDUCE
-        std::cout << "\nRunning Sequential MapReduce...";
+    std::cout << "\nRunning Sequential MapReduce...";
 
-        spec.map_tasks = 1;
-        spec.reduce_tasks = 1;
+    spec.map_tasks = 1;
+    spec.reduce_tasks = 1;
 
-        wordcount::job job(datasource, spec);
+    wordcount::job job(datasource, spec);
+    try
+    {
         job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
-        std::cout << "\nFinished.";
+        std::cout << "\nSequential MapReduce Finished.";
 #else
-        std::cout << "\nRunning CPU Parallel MapReduce...";
-        spec.reduce_tasks = 1;
+    std::cout << "\nRunning CPU Parallel MapReduce...";
 
-        if (argc > 2)
-            spec.map_tasks = atoi(argv[2]);
+    if (argc > 2)
+        spec.map_tasks = atoi(argv[2]);
 
-        // this method can be called, but since we want access to the result data,
-        // we need to have a job object to interrogate
-        //boost::mapreduce::run<wordcount::job>(spec, result);
+    if (argc > 3)
+        spec.reduce_tasks = atoi(argv[3]);
+    else
+        spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
 
-        wordcount::job job(datasource, spec);
+    // this method can be called, but since we want access to the result data,
+    // we need to have a job object to interrogate
+    //boost::mapreduce::run<wordcount::job>(spec, result);
+
+    wordcount::job job(datasource, spec);
+    try
+    {
         job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
-        std::cout << "\nCPU Parallel MapReduce Finished.";
+        std::cout << "\nCPU Parallel MapReduce Finished.\n";
 #endif
     }
     catch (std::exception &e)
     {
-        std::cout << std::endl << "Error: " << e.what();
+        std::cout << std::endl << "Error running MapReduce: " << e.what();
     }
 
-    typedef std::pair<wordcount::reduce_task::key_type, wordcount::reduce_task::value_type> keyvalue_t;
-
-    std::cout << std::endl << "\n" << "MapReduce statistics:";
-    std::cout << "\n  " << "MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
-    std::cout << "\n  " << "  Map phase runtime                       : " << result.map_runtime << " seconds";
-    std::cout << "\n  " << "  Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
-    std::cout << "\n\n  " << "Map:";
-    std::cout << "\n    " << "Total Map keys                          : " << result.counters.map_keys_executed;
-    std::cout << "\n    " << "Map keys processed                      : " << result.counters.map_keys_completed;
-    std::cout << "\n    " << "Map key processing errors               : " << result.counters.map_key_errors;
-    std::cout << "\n    " << "Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
-    std::cout << "\n    " << "Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    " << "Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-    std::cout << "\n    " << "Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
-
-    std::cout << "\n\n  " << "Reduce:";
-    std::cout << "\n    " << "Total Reduce keys                       : " << result.counters.reduce_keys_executed;
-    std::cout << "\n    " << "Reduce keys processed                   : " << result.counters.reduce_keys_completed;
-    std::cout << "\n    " << "Reduce key processing errors            : " << result.counters.reduce_key_errors;
-    std::cout << "\n    " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
-    std::cout << "\n    " << "Number of Result Files                  : " << result.counters.num_result_files;
+    std::cout << "\nMapReduce statistics:";
+    std::cout << "\n  MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
+    std::cout << "\n    Map phase runtime                       : " << result.map_runtime << " seconds";
+    std::cout << "\n    Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
+    std::cout << "\n\n  Map:";
+    std::cout << "\n    Total Map keys                          : " << result.counters.map_keys_executed;
+    std::cout << "\n    Map keys processed                      : " << result.counters.map_keys_completed;
+    std::cout << "\n    Map key processing errors               : " << result.counters.map_key_errors;
+    std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
+    std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+    std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+    std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+
+    std::cout << "\n\n  Reduce:";
+    std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
+    std::cout << "\n    Reduce keys processed                   : " << result.counters.reduce_keys_completed;
+    std::cout << "\n    Reduce key processing errors            : " << result.counters.reduce_key_errors;
+    std::cout << "\n    Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+    std::cout << "\n    Number of Result Files                  : " << result.counters.num_result_files;
     if (result.reduce_times.size() > 0)
     {
-        std::cout << "\n    " << "Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    " << "Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-        std::cout << "\n    " << "Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+        std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+        std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+        std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+    }
+
+    wordcount::job::const_result_iterator it  = job.begin_results();
+    wordcount::job::const_result_iterator ite = job.end_results();
+    if (it != ite)
+    {
+        typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
+        frequencies_t frequencies;
+        frequencies.push_back(*it);
+        frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
+        for (++it; it!=ite; ++it)
+        {
+            if (frequencies.size() < 10)    // show top 10
+            {
+                frequencies.push_back(*it);
+                if (it->second < it_smallest->second)
+                    it_smallest = frequencies.rbegin();
+            }
+            else if (it->second > it_smallest->second)
+            {
+                *it_smallest = *it;
+                it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
+            }
+        }
+
+        frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
+        std::cout << "\n\nMapReduce results:";
+        for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
+            std::cout << "\n" << freq->first << "\t" << freq->second;
     }
 
     return 0;