$include_dir="/home/hyper-archives/boost-commit/include"; include("$include_dir/msg-header.inc") ?>
Subject: [Boost-commit] svn:boost r86316 - in trunk: boost/interprocess/ipc libs/interprocess/doc libs/interprocess/test
From: igaztanaga_at_[hidden]
Date: 2013-10-15 04:02:10
Author: igaztanaga
Date: 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013)
New Revision: 86316
URL: http://svn.boost.org/trac/boost/changeset/86316
Log:
Fixes #9221 ("message_queue deadlock on linux")
Text files modified: 
   trunk/boost/interprocess/ipc/message_queue.hpp      |   190 ++++++++++++++++++++++++++------------- 
   trunk/libs/interprocess/doc/interprocess.qbk        |    10 ++                                      
   trunk/libs/interprocess/test/message_queue_test.cpp |    84 +++++++++++++++++                       
   3 files changed, 220 insertions(+), 64 deletions(-)
Modified: trunk/boost/interprocess/ipc/message_queue.hpp
==============================================================================
--- trunk/boost/interprocess/ipc/message_queue.hpp	Tue Oct 15 01:23:53 2013	(r86315)
+++ trunk/boost/interprocess/ipc/message_queue.hpp	2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013)	(r86316)
@@ -306,6 +306,8 @@
          m_cur_num_msg(0)
          #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
          ,m_cur_first_msg(0u)
+         ,m_blocked_senders(0u)
+         ,m_blocked_receivers(0u)
          #endif
       {  this->initialize_memory();  }
 
@@ -376,17 +378,17 @@
    {
       iterator it_inserted_ptr_end = this->inserted_ptr_end();
       iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
-      if(where == it_inserted_ptr_end){
-         ++m_cur_num_msg;
-         return **it_inserted_ptr_end;
-      }
-      else if(where == it_inserted_ptr_beg){
+      if(where == it_inserted_ptr_beg){
          //unsigned integer guarantees underflow
          m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
          --m_cur_first_msg;
          ++m_cur_num_msg;
          return *mp_index[m_cur_first_msg];
       }
+      else if(where == it_inserted_ptr_end){
+         ++m_cur_num_msg;
+         return **it_inserted_ptr_end;
+      }
       else{
          size_type pos  = where - &mp_index[0];
          size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
@@ -452,7 +454,7 @@
       }
    }
 
-   #else
+   #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
 
    typedef msg_hdr_ptr_t *iterator;
 
@@ -482,7 +484,7 @@
       return **pos;
    }
 
-   #endif
+   #endif   //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
 
    //!Inserts the first free message in the priority queue
    msg_header & queue_free_msg(unsigned int priority)
@@ -507,7 +509,7 @@
             //Check where the free message should be placed
             it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
          }
-         
+         assert(0);  
       }
       //Insert the free message in the correct position
       return this->insert_at(it);
@@ -577,6 +579,8 @@
    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
    //Current start offset in the circular index
    size_type                  m_cur_first_msg;
+   size_type                  m_blocked_senders;
+   size_type                  m_blocked_receivers;
    #endif
 };
 
@@ -714,41 +718,67 @@
       throw interprocess_exception(size_error);
    }
 
-   bool was_empty = false;
+   #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+   bool notify_blocked_receivers = false;
+   #endif
    //---------------------------------------------
    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
    //---------------------------------------------
    {
       //If the queue is full execute blocking logic
       if (p_hdr->is_full()) {
-         switch(block){
-            case non_blocking :
-               return false;
-            break;
-
-            case blocking :
-               do{
-                  p_hdr->m_cond_send.wait(lock);
-               }
-               while (p_hdr->is_full());
-            break;
-
-            case timed :
-               do{
-                  if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
-                     if(p_hdr->is_full())
-                        return false;
-                     break;
+         BOOST_TRY{
+            #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+            ++p_hdr->m_blocked_senders;
+            #endif
+            switch(block){
+               case non_blocking :
+                  #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+                  --p_hdr->m_blocked_senders;
+                  #endif
+                  return false;
+               break;
+
+               case blocking :
+                  do{
+                     p_hdr->m_cond_send.wait(lock);
+                  }
+                  while (p_hdr->is_full());
+               break;
+
+               case timed :
+                  do{
+                     if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
+                        if(p_hdr->is_full()){
+                           #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+                           --p_hdr->m_blocked_senders;
+                           #endif
+                           return false;
+                        }
+                        break;
+                     }
                   }
-               }
-               while (p_hdr->is_full());
-            break;
-            default:
-            break;
+                  while (p_hdr->is_full());
+               break;
+               default:
+               break;
+            }
+            #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+            --p_hdr->m_blocked_senders;
+            #endif
+         }
+         BOOST_CATCH(...){
+            #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+            --p_hdr->m_blocked_senders;
+            #endif
+            BOOST_RETHROW;
          }
+         BOOST_CATCH_END
       }
 
-      was_empty = p_hdr->is_empty();
+      #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+      notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
+      #endif
       //Insert the first free message in the priority queue
       ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
 
@@ -767,9 +797,13 @@
    //Notify outside lock to avoid contention. This might produce some
    //spurious wakeups, but it's usually far better than notifying inside.
    //If this message changes the queue empty state, notify it to receivers
-   if (was_empty){
+   #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+   if (notify_blocked_receivers){
       p_hdr->m_cond_recv.notify_one();
    }
+   #else
+   p_hdr->m_cond_recv.notify_one();
+   #endif
 
    return true;
 }
@@ -811,42 +845,70 @@
       throw interprocess_exception(size_error);
    }
 
-   bool was_full = false;
+   #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+   bool notify_blocked_senders = false;
+   #endif
    //---------------------------------------------
    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
    //---------------------------------------------
    {
       //If there are no messages execute blocking logic
       if (p_hdr->is_empty()) {
-         switch(block){
-            case non_blocking :
-               return false;
-            break;
-
-            case blocking :
-               do{
-                  p_hdr->m_cond_recv.wait(lock);
-               }
-               while (p_hdr->is_empty());
-            break;
-
-            case timed :
-               do{
-                  if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
-                     if(p_hdr->is_empty())
-                        return false;
-                     break;
+         BOOST_TRY{
+            #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+            ++p_hdr->m_blocked_receivers;
+            #endif
+            switch(block){
+               case non_blocking :
+                  #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+                  --p_hdr->m_blocked_receivers;
+                  #endif
+                  return false;
+               break;
+
+               case blocking :
+                  do{
+                     p_hdr->m_cond_recv.wait(lock);
+                  }
+                  while (p_hdr->is_empty());
+               break;
+
+               case timed :
+                  do{
+                     if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
+                        if(p_hdr->is_empty()){
+                           #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+                           --p_hdr->m_blocked_receivers;
+                           #endif
+                           return false;
+                        }
+                        break;
+                     }
                   }
-               }
-               while (p_hdr->is_empty());
-            break;
-
-            //Paranoia check
-            default:
-            break;
+                  while (p_hdr->is_empty());
+               break;
+
+               //Paranoia check
+               default:
+               break;
+            }
+            #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+            --p_hdr->m_blocked_receivers;
+            #endif
+         }
+         BOOST_CATCH(...){
+            #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+            --p_hdr->m_blocked_receivers;
+            #endif
+            BOOST_RETHROW;
          }
+         BOOST_CATCH_END
       }
 
+      #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+      notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
+      #endif
+
       //There is at least one message ready to pick, get the top one
       ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
 
@@ -861,8 +923,6 @@
       //Copy data to receiver's bufers
       std::memcpy(buffer, top_msg.data(), recvd_size);
 
-      was_full = p_hdr->is_full();
-
       //Free top message and put it in the free message list
       p_hdr->free_top_msg();
    }  //Lock end
@@ -870,9 +930,13 @@
    //Notify outside lock to avoid contention. This might produce some
    //spurious wakeups, but it's usually far better than notifying inside.
    //If this reception changes the queue full state, notify senders
-   if (was_full){
+   #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+   if (notify_blocked_senders){
       p_hdr->m_cond_send.notify_one();
    }
+   #else
+   p_hdr->m_cond_send.notify_one();
+   #endif
 
    return true;
 }
Modified: trunk/libs/interprocess/doc/interprocess.qbk
==============================================================================
--- trunk/libs/interprocess/doc/interprocess.qbk	Tue Oct 15 01:23:53 2013	(r86315)
+++ trunk/libs/interprocess/doc/interprocess.qbk	2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013)	(r86316)
@@ -6713,6 +6713,16 @@
 
 [section:release_notes Release Notes]
 
+[section:release_notes_boost_1_56_00 Boost 1.56 Release]
+*  Fixed bugs: 
+   * [@https://svn.boost.org/trac/boost/ticket/9221 #9221 ("message_queue deadlock on linux")].
+      
+*  [*ABI breaking]: [@https://svn.boost.org/trac/boost/ticket/9221 #9221] showed
+   that `BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX` option of message queue,
+   was completely broken so a ABI break was necessary to have a working implementation.
+
+[endsect]
+
 [section:release_notes_boost_1_55_00 Boost 1.55 Release]
 
 *  Fixed bugs [@https://svn.boost.org/trac/boost/ticket/7156 #7156],
Modified: trunk/libs/interprocess/test/message_queue_test.cpp
==============================================================================
--- trunk/libs/interprocess/test/message_queue_test.cpp	Tue Oct 15 01:23:53 2013	(r86315)
+++ trunk/libs/interprocess/test/message_queue_test.cpp	2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013)	(r86316)
@@ -17,6 +17,7 @@
 #include <boost/interprocess/allocators/node_allocator.hpp>
 #include <boost/interprocess/detail/os_thread_functions.hpp>
 #include <vector>
+#include <iostream>
 #include <cstddef>
 #include <limits>
 #include <memory>
@@ -222,12 +223,12 @@
    return true;
 }
 //]
+
 static const int MsgSize = 10;
 static const int NumMsg  = 1000;
 static char msgsend [10];
 static char msgrecv [10];
 
-
 static boost::interprocess::message_queue *pmessage_queue;
 
 void receiver()
@@ -267,6 +268,83 @@
    return true;
 }
 
+
+//////////////////////////////////////////////////////////////////////////////
+//
+// test_multi_sender_receiver is based on Alexander (aalutov's)
+// testcase for ticket #9221. Many thanks.
+//
+//////////////////////////////////////////////////////////////////////////////
+
+static boost::interprocess::message_queue *global_queue = 0;
+//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
+static const int MULTI_NUM_MSG_PER_SENDER = 10000;
+//Message queue message capacity
+static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
+//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers 
+static const int MULTI_THREAD_COUNT = 10;
+
+static void multisend()
+{
+	char buff;
+	for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
+		global_queue->send(&buff, 1, 0);
+	}
+	global_queue->send(&buff, 0, 0);
+	//std::cout<<"writer thread complete"<<std::endl;
+}
+
+static void multireceive()
+{
+	char buff;
+	size_t size;
+   int received_msgs = 0;
+	unsigned int priority;
+	do {
+		global_queue->receive(&buff, 1, size, priority);
+      ++received_msgs;
+	} while (size > 0);
+   --received_msgs;
+	//std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
+}
+
+
+bool test_multi_sender_receiver()
+{
+   bool ret = true;
+   //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
+	try {
+		boost::interprocess::message_queue::remove(test::get_process_id_name());
+		boost::interprocess::message_queue mq
+         (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
+      global_queue = &mq;
+      std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
+
+      //Launch senders receiver thread
+		for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+         boost::interprocess::ipcdetail::thread_launch
+            (threads[i], &multisend);
+		}
+
+		for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+         boost::interprocess::ipcdetail::thread_launch
+            (threads[MULTI_THREAD_COUNT+i], &multireceive);
+		}
+
+		for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
+         boost::interprocess::ipcdetail::thread_join(threads[i]);
+         //std::cout << "Joined thread " << i << std::endl;
+		}
+	}
+   catch (std::exception &e) {
+		std::cout << "error " << e.what() << std::endl;
+      ret = false;
+	}
+   boost::interprocess::message_queue::remove(test::get_process_id_name());
+	return ret;
+}
+
+
 int main ()
 {
    if(!test_priority_order()){
@@ -281,6 +359,10 @@
       return 1;
    }
 
+   if(!test_multi_sender_receiver()){
+      return 1;
+   }
+
    return 0;
 }