$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r84401 - in branches/release: boost/lockfree boost/lockfree/detail libs/lockfree libs/lockfree/test
From: tim_at_[hidden]
Date: 2013-05-21 06:26:58
Author: timblechmann
Date: 2013-05-21 06:26:57 EDT (Tue, 21 May 2013)
New Revision: 84401
URL: http://svn.boost.org/trac/boost/changeset/84401
Log:
lockfree: merge fixes from trunk 
Properties modified: 
   branches/release/boost/lockfree/   (props changed)
   branches/release/libs/lockfree/   (props changed)
Text files modified: 
   branches/release/boost/lockfree/detail/atomic.hpp       |    14 +-                                      
   branches/release/boost/lockfree/spsc_queue.hpp          |     9                                         
   branches/release/libs/lockfree/test/spsc_queue_test.cpp |   203 +++++++++++++++++++++++++++++++++++++++ 
   3 files changed, 212 insertions(+), 14 deletions(-)
Modified: branches/release/boost/lockfree/detail/atomic.hpp
==============================================================================
--- branches/release/boost/lockfree/detail/atomic.hpp	(original)
+++ branches/release/boost/lockfree/detail/atomic.hpp	2013-05-21 06:26:57 EDT (Tue, 21 May 2013)
@@ -20,20 +20,20 @@
 // GCC supports atomic<> from version 4.8 onwards.
 #if defined(__GNUC__)
 # if defined(__GNUC_PATCHLEVEL__)
-#  define __GNUC_VERSION__ (__GNUC__ * 10000 \
-                            + __GNUC_MINOR__ * 100 \
-                            + __GNUC_PATCHLEVEL__)
+#  define BOOST_ATOMIC_GNUC_VERSION (__GNUC__ * 10000           \
+                                     + __GNUC_MINOR__ * 100     \
+                                     + __GNUC_PATCHLEVEL__)
 # else
-#  define __GNUC_VERSION__ (__GNUC__ * 10000 \
-                            + __GNUC_MINOR__ * 100)
+#  define BOOST_LOCKFREE_GNUC_VERSION (__GNUC__ * 10000         \
+                                     + __GNUC_MINOR__ * 100)
 # endif
 #endif
 
-#if (__GNUC_VERSION__ >= 40800) && (__cplusplus >= 201103L)
+#if (BOOST_LOCKFREE_GNUC_VERSION >= 40800) && (__cplusplus >= 201103L)
 #undef BOOST_LOCKFREE_NO_HDR_ATOMIC
 #endif
 
-#undef __GNUC_VERSION__
+#undef BOOST_LOCKFREE_GNUC_VERSION
 
 #if defined(BOOST_LOCKFREE_NO_HDR_ATOMIC)
 #include <boost/atomic.hpp>
Modified: branches/release/boost/lockfree/spsc_queue.hpp
==============================================================================
--- branches/release/boost/lockfree/spsc_queue.hpp	(original)
+++ branches/release/boost/lockfree/spsc_queue.hpp	2013-05-21 06:26:57 EDT (Tue, 21 May 2013)
@@ -284,11 +284,12 @@
     }
 };
 
-template <typename T, std::size_t max_size>
+template <typename T, std::size_t MaxSize>
 class compile_time_sized_ringbuffer:
     public ringbuffer_base<T>
 {
     typedef std::size_t size_t;
+    static const std::size_t max_size = MaxSize + 1;
     boost::array<T, max_size> array_;
 
 public:
@@ -349,7 +350,7 @@
 
 public:
     explicit runtime_sized_ringbuffer(size_t max_elements):
-        max_elements_(max_elements)
+        max_elements_(max_elements + 1)
     {
         // TODO: we don't necessarily need to construct all elements
         array_ = Alloc::allocate(max_elements);
@@ -359,7 +360,7 @@
 
     template <typename U>
     runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_t max_elements):
-        Alloc(alloc), max_elements_(max_elements)
+        Alloc(alloc), max_elements_(max_elements + 1)
     {
         // TODO: we don't necessarily need to construct all elements
         array_ = Alloc::allocate(max_elements);
@@ -368,7 +369,7 @@
     }
 
     runtime_sized_ringbuffer(Alloc const & alloc, size_t max_elements):
-        Alloc(alloc), max_elements_(max_elements)
+        Alloc(alloc), max_elements_(max_elements + 1)
     {
         // TODO: we don't necessarily need to construct all elements
         array_ = Alloc::allocate(max_elements);
Modified: branches/release/libs/lockfree/test/spsc_queue_test.cpp
==============================================================================
--- branches/release/libs/lockfree/test/spsc_queue_test.cpp	(original)
+++ branches/release/libs/lockfree/test/spsc_queue_test.cpp	2013-05-21 06:26:57 EDT (Tue, 21 May 2013)
@@ -131,6 +131,21 @@
     output_iterator_
 };
 
+BOOST_AUTO_TEST_CASE( spsc_queue_capacity_test )
+{
+    spsc_queue<int, capacity<2> > f;
+
+    BOOST_REQUIRE(f.push(1));
+    BOOST_REQUIRE(f.push(2));
+    BOOST_REQUIRE(!f.push(3));
+
+    spsc_queue<int> g(2);
+
+    BOOST_REQUIRE(g.push(1));
+    BOOST_REQUIRE(g.push(2));
+    BOOST_REQUIRE(!g.push(3));
+}
+
 
 template <int EnqueueMode>
 void spsc_queue_buffer_push_return_value(void)
@@ -162,15 +177,15 @@
 
     switch (EnqueueMode) {
     case pointer_and_size:
-        BOOST_REQUIRE_EQUAL(rb.push(data, xqueue_size), buffer_size - xqueue_size - 1);
+        BOOST_REQUIRE_EQUAL(rb.push(data, xqueue_size), buffer_size - xqueue_size);
         break;
 
     case reference_to_array:
-        BOOST_REQUIRE_EQUAL(rb.push(data), buffer_size - xqueue_size - 1);
+        BOOST_REQUIRE_EQUAL(rb.push(data), buffer_size - xqueue_size);
         break;
 
     case iterator_pair:
-        BOOST_REQUIRE_EQUAL(rb.push(data, data + xqueue_size), data + buffer_size - xqueue_size - 1);
+        BOOST_REQUIRE_EQUAL(rb.push(data, data + xqueue_size), data + buffer_size - xqueue_size);
         break;
 
     default:
@@ -291,3 +306,185 @@
     spsc_queue_buffer_pop<reference_to_array, 7, 16, 64>();
     spsc_queue_buffer_pop<output_iterator_, 7, 16, 64>();
 }
+
+
+#ifndef BOOST_LOCKFREE_STRESS_TEST
+static const boost::uint32_t nodes_per_thread = 100000;
+#else
+static const boost::uint32_t nodes_per_thread = 100000000;
+#endif
+
+struct spsc_queue_tester
+{
+    spsc_queue<int, capacity<128> > sf;
+
+    boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes;
+
+    static_hashed_set<int, 1<<16 > working_set;
+
+    spsc_queue_tester(void):
+        spsc_queue_cnt(0), received_nodes(0)
+    {}
+
+    void add(void)
+    {
+        for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) {
+            int id = generate_id<int>();
+            working_set.insert(id);
+
+            while (sf.push(id) == false)
+            {}
+
+            ++spsc_queue_cnt;
+        }
+    }
+
+    bool get_element(void)
+    {
+        int data;
+        bool success = sf.pop(data);
+
+        if (success) {
+            ++received_nodes;
+            --spsc_queue_cnt;
+            bool erased = working_set.erase(data);
+            assert(erased);
+            return true;
+        } else
+            return false;
+    }
+
+    boost::lockfree::detail::atomic<bool> running;
+
+    void get(void)
+    {
+        for(;;) {
+            bool success = get_element();
+            if (!running && !success)
+                return;
+        }
+    }
+
+    void run(void)
+    {
+        running = true;
+
+        BOOST_REQUIRE(sf.empty());
+
+        thread reader(boost::bind(&spsc_queue_tester::get, this));
+        thread writer(boost::bind(&spsc_queue_tester::add, this));
+        cout << "reader and writer threads created" << endl;
+
+        writer.join();
+        cout << "writer threads joined. waiting for readers to finish" << endl;
+
+        running = false;
+        reader.join();
+
+        BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
+        BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
+        BOOST_REQUIRE(sf.empty());
+        BOOST_REQUIRE(working_set.count_nodes() == 0);
+    }
+};
+
+BOOST_AUTO_TEST_CASE( spsc_queue_test_caching )
+{
+    boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester);
+    test1->run();
+}
+
+struct spsc_queue_tester_buffering
+{
+    spsc_queue<int, capacity<128> > sf;
+
+    boost::lockfree::detail::atomic<long> spsc_queue_cnt;
+
+    static_hashed_set<int, 1<<16 > working_set;
+    boost::lockfree::detail::atomic<long> received_nodes;
+
+    spsc_queue_tester_buffering(void):
+        spsc_queue_cnt(0), received_nodes(0)
+    {}
+
+    static const size_t buf_size = 5;
+
+    void add(void)
+    {
+        boost::array<int, buf_size> input_buffer;
+        for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) {
+            for (size_t i = 0; i != buf_size; ++i) {
+                int id = generate_id<int>();
+                working_set.insert(id);
+                input_buffer[i] = id;
+            }
+
+            size_t pushed = 0;
+
+            do {
+                pushed += sf.push(input_buffer.c_array() + pushed,
+                                  input_buffer.size()    - pushed);
+            } while (pushed != buf_size);
+
+            spsc_queue_cnt+=buf_size;
+        }
+    }
+
+    bool get_elements(void)
+    {
+        boost::array<int, buf_size> output_buffer;
+
+        size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size());
+
+        if (popd) {
+            received_nodes += popd;
+            spsc_queue_cnt -= popd;
+
+            for (size_t i = 0; i != popd; ++i) {
+                bool erased = working_set.erase(output_buffer[i]);
+                assert(erased);
+            }
+
+            return true;
+        } else
+            return false;
+    }
+
+    boost::lockfree::detail::atomic<bool> running;
+
+    void get(void)
+    {
+        for(;;) {
+            bool success = get_elements();
+            if (!running && !success)
+                return;
+        }
+    }
+
+    void run(void)
+    {
+        running = true;
+
+        thread reader(boost::bind(&spsc_queue_tester_buffering::get, this));
+        thread writer(boost::bind(&spsc_queue_tester_buffering::add, this));
+        cout << "reader and writer threads created" << endl;
+
+        writer.join();
+        cout << "writer threads joined. waiting for readers to finish" << endl;
+
+        running = false;
+        reader.join();
+
+        BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread);
+        BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0);
+        BOOST_REQUIRE(sf.empty());
+        BOOST_REQUIRE(working_set.count_nodes() == 0);
+    }
+};
+
+
+BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering )
+{
+    boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering);
+    test1->run();
+}