$include_dir="/home/hyper-archives/boost/include"; include("$include_dir/msg-header.inc") ?>
From: terekhov_at_[hidden]
Date: 2001-03-15 16:20:58
--- In boost_at_y..., williamkempf_at_h... wrote:
> --- In boost_at_y..., terekhov_at_y... wrote:
[...]
> > yes, indeed, it is very closely related. but here again the
problem
> > is that signals get consumed in "unfair" fashion by some threads
> > multiple times (as non-detectable spurious wakeups) and some
threads
> > could remain blocked on condition variable (BTW, that violates
> > CV "atomic" semantics with respect to mutex and other threads
> > calling signal/broadcast).
>
> Neither POSIX, nor more importantly Boost.Threads,
> gaurantees "fairness" in this regard. It's a noble goal, and if
> you've a solution to attain it I'll gladly do so, but it's not a
> requirement.
the solutions presented below do not produce spurious wakeups
(neither detectable nor non-detectable) hence, do not have this
defect;
> (BTW, I don't see how the CV "atomic" semantics are
> violated here.)
example:
(count == 0)
t1. thread A: mutex.lock; count++; do cv.wait( mutex ) while count !=
0;....
t2. thread B: mutex.lock; count++; do cv.wait( mutex ) while count !=
0;....
(mutex is free A and B already "blocked" on CV)
t3. thread C: mutex.lock; if (count>1) {cv.broadcast;count=0;}
mutex.unlock;...
Now, POSIX says:
--
a) ...functions atomically release mutex and cause the calling thread
to block on the condition variable cond; atomically here means
"atomically with respect to access by another thread to the
mutex and then the condition variable". That is, if another thread is
able to acquire the mutex after the about-to-block thread has
released
it, then a subsequent call to pthread_cond_signal() or
pthread_cond_broadcast() in that thread behaves as if it were
issued after the about-to-block thread has blocked.
b) The pthread_cond_broadcast() call unblocks all threads currently
blocked on the specified condition variable cond.
--
cv.broadcast _should_ unblock thread B AND thread A!
the behaviour when thread A would consume two signals and
thread B would remain blocked IS INCORRECT.
> > i've posted multiple solutions (hopefully correct ones) to
> > comp.programming.threads/pthread_cond_wait() and
WaitForSingleObject
> > if you want, i can post it here or send it directly to you..
>
> Either would be helpful.
Newsgroups:
comp.programming.threads
Subject:
Re: pthread_cond_wait() and
WaitForSingleObject()
Date:
Wed, 28 Feb 2001 16:43:36 +0100
?John Hickin wrote:
?
?
? Doug Schmidt's web site you will find a paper that analyses several
? different approaches to providing posix-like conditional waits,
? including one using a CRITICAL_SECTIONs, semaphores, and
? SignalObjectAndWait:
?
? http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
?
? I can't say it any better than this web page says it:
?
? ?quote?
? This article illustrates why developing condition variables on
Win32
? platforms is tricky and error-prone. There are several subtle
design
? forces that must be addressed by developers. In general, the
different
? implementations we've examined vary according to their correctness,
? efficiency, fairness, and portability. No one solution provides all
? these qualities optimally.
? ?/quote?
?
? Note the final sentence.
?
? Regards, John.
I would like to add that no solution presented in this paper
implements condition variables correctly. I've reported to
ACE/pthread-win32 folks several problems such as lost signal,
double/nested broadcast deadlock, spurious wakeups,..
Louis Thomas ?lthomas_at_[hidden]? and I tried to find
a correct and efficient solution. I think that the following
solutions (semaphore/event based) implement condition variables
correctly. I would greatly appreciate it if someone else would
spend some time verifying the correctness and efficiency of
the algorithms below.
regards,
alexander.
---------- Algorithm 6b ----------
gate - semaphore
queue - semaphore
counters - CS or mutex
ex_mutex - CS or mutex
nWaiters - int
nSignaled - int
nGone - int
wait(timeout) {
sem_wait gate // pass gate
nWaiters++
sem_post gate
unlock ex_mutex
_bTimedOut=sem_wait(queue, timeout)
lock counters
if (_bTimedOut ?? (0==nSignaled || 0!=nWaiters-nGone)) {
nGone++ // retract our waiter status (ignore overflow)
} else {
if (_bTimedOut) {
sem_wait queue // better now than spurious later
_bTimedout=false
}
nSignaled--
if (0==nSignaled) {
sem_post gate // open gate
}
}
unlock counters
lock ex_mutex
return _bTimedOut?ETIMEDOUT:0
}
signal(bAll) {
_nSig=0
lock counters
// this is safe because nWaiting can only be decremented by a thread
that
// owns counters and nGone can only be changed by a thread that owns
counters.
if (nWaiting?nGone) {
if (0==nSignaled) {
sema_wait gate // close gate if not already closed
}
if (nGone?0) {
nWaiting-=nGone
nGone=0
}
_nSig=bAll?nWaiting:1
nSignaled+=_nSig
nWaiting-=_nSig
}
unlock counters
if (0!=_nSig) {
sema_post queue, _nSig
}
}
---------- Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL
------
given:
semBlockLock - bin.semaphore
semBlockQueue - semaphore
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
[auto: register int result ] // error checking omitted
[auto: register int nSignalsWasLeft ]
[auto: register int nWaitersWasGone ]
sem_wait( semBlockLock );
nWaitersBlocked++;
sem_post( semBlockLock );
unlock( mtxExternal );
bTimedOut = sem_wait( semBlockQueue,timeout );
lock( mtxUnblockLock );
if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) {
if ( bTimeout ) { // timeout (or canceled)
if ( 0 != nWaitersBlocked ) {
nWaitersBlocked--;
}
else {
nWaitersGone++; // count spurious wakeups
}
}
if ( 0 == --nWaitersToUnblock ) {
if ( 0 != nWaitersBlocked ) {
sem_post( semBlockLock ); // open the gate
nSignalsWasLeft = 0; // do not open the gate
below again
}
else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
nWaitersGone = 0;
}
}
}
else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious semaphore :-)
sem_wait( semBlockLock );
nWaitersBlocked -= nWaitersGone; // something is going on
here - test of timeouts? :-)
sem_post( semBlockLock );
nWaitersGone = 0;
}
unlock( mtxUnblockLock );
if ( 1 == nSignalsWasLeft ) {
if ( 0 != nWaitersWasGone ) {
// sem_adjust( semBlockQueue,-nWaitersWasGone );
while ( nWaitersWasGone-- ) {
sem_wait( semBlockQueue ); // better now than spurious
later
}
}
sem_post( semBlockLock ); // open the gate
}
lock( mtxExternal );
return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
[auto: register int result ]
[auto: register int nSignalsToIssue]
lock( mtxUnblockLock );
if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
if ( 0 == nWaitersBlocked ) { // NO-OP
return unlock( mtxUnblockLock );
}
if (bAll) {
nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked;
nWaitersBlocked = 0;
}
else {
nSignalsToIssue = 1;
nWaitersToUnblock++;
nWaitersBlocked--;
}
}
else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
sem_wait( semBlockLock ); // close the gate
if ( 0 != nWaitersGone ) {
nWaitersBlocked -= nWaitersGone;
nWaitersGone = 0;
}
if (bAll) {
nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked;
nWaitersBlocked = 0;
}
else {
nSignalsToIssue = nWaitersToUnblock = 1;
nWaitersBlocked--;
}
}
else { // NO-OP
return unlock( mtxUnblockLock );
}
unlock( mtxUnblockLock );
sem_post( semBlockQueue,nSignalsToIssue );
return result;
}
---------- Algorithm 8b / IMPL_SEM,UNBLOCK_STRATEGY ==
UNBLOCK_ONEBYONE
------
given:
semBlockLock - bin.semaphore
semBlockQueue - bin.semaphore
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
[auto: register int result ] // error checking omitted
[auto: register int nWaitersWasGone ]
[auto: register int nSignalsWasLeft ]
sem_wait( semBlockLock );
nWaitersBlocked++;
sem_post( semBlockLock );
unlock( mtxExternal );
bTimedOut = sem_wait( semBlockQueue,timeout );
lock( mtxUnblockLock );
if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) {
if ( bTimeout ) { // timeout (or canceled)
if ( 0 != nWaitersBlocked ) {
nWaitersBlocked--;
nSignalsWasLeft = 0; // do not unblock next
waiter below (already unblocked)
}
else {
nWaitersGone = 1; // spurious wakeup
pending!!
}
}
if ( 0 == --nWaitersToUnblock ??
if ( 0 != nWaitersBlocked ) {
sem_post( semBlockLock ); // open the gate
nSignalsWasLeft = 0; // do not open the gate
below again
}
else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
nWaitersGone = 0;
}
}
}
else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious semaphore :-)
sem_wait( semBlockLock );
nWaitersBlocked -= nWaitersGone; // something is going on
here - test of timeouts? :-)
sem_post( semBlockLock );
nWaitersGone = 0;
}
unlock( mtxUnblockLock );
if ( 1 == nSignalsWasLeft ) {
if ( 0 != nWaitersWasGone ) {
// sem_adjust( semBlockQueue,-1 );
sem_wait( semBlockQueue ); // better now than spurious
later
}
sem_post( semBlockLock ); // open the gate
}
else if ( 0 != nSignalsWasLeft ) {
sem_post( semBlockQueue ); // unblock next waiter
}
lock( mtxExternal );
return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
[auto: register int result ]
lock( mtxUnblockLock );
if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
if ( 0 == nWaitersBlocked ) { // NO-OP
return unlock( mtxUnblockLock );
}
if (bAll) {
nWaitersToUnblock += nWaitersBlocked;
nWaitersBlocked = 0;
}
else {
nWaitersToUnblock++;
nWaitersBlocked--;
}
unlock( mtxUnblockLock );
}
else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
sem_wait( semBlockLock ); // close the gate
if ( 0 != nWaitersGone ) {
nWaitersBlocked -= nWaitersGone;
nWaitersGone = 0;
}
if (bAll) {
nWaitersToUnblock = nWaitersBlocked;
nWaitersBlocked = 0;
}
else {
nWaitersToUnblock = 1;
nWaitersBlocked--;
}
unlock( mtxUnblockLock );
sem_post( semBlockQueue );
}
else { // NO-OP
unlock( mtxUnblockLock );
}
return result;
}
---------- Algorithm 8c / IMPL_EVENT,UNBLOCK_STRATEGY ==
UNBLOCK_ONEBYONE ---------
given:
hevBlockLock - auto-reset event
hevBlockQueue - auto-reset event
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
[auto: register int result ] // error checking omitted
[auto: register int nSignalsWasLeft ]
[auto: register int nWaitersWasGone ]
wait( hevBlockLock,INFINITE );
nWaitersBlocked++;
set_event( hevBlockLock );
unlock( mtxExternal );
bTimedOut = wait( hevBlockQueue,timeout );
lock( mtxUnblockLock );
if ( 0 != (SignalsWasLeft = nWaitersToUnblock) ) {
if ( bTimeout ) { // timeout (or canceled)
if ( 0 != nWaitersBlocked ) {
nWaitersBlocked--;
nSignalsWasLeft = 0; // do not unblock next
waiter below (already unblocked)
}
else {
nWaitersGone = 1; // spurious wakeup
pending!!
}
}
if ( 0 == --nWaitersToUnblock )
if ( 0 != nWaitersBlocked ) {
set_event( hevBlockLock ); // open the gate
nSignalsWasLeft = 0; // do not open the gate
below again
}
else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
nWaitersGone = 0;
}
}
}
else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious event :-)
wait( hevBlockLock,INFINITE );
nWaitersBlocked -= nWaitersGone; // something is going on
here - test of timeouts? :-)
set_event( hevBlockLock );
nWaitersGone = 0;
}
unlock( mtxUnblockLock );
if ( 1 == nSignalsWasLeft ) {
if ( 0 != nWaitersWasGone ) {
reset_event( hevBlockQueue ); // better now than
spurious
later
}
set_event( hevBlockLock ); // open the gate
}
else if ( 0 != nSignalsWasLeft ) {
set_event( hevBlockQueue ); // unblock next waiter
}
lock( mtxExternal );
return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
[auto: register int result ]
lock( mtxUnblockLock );
if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
if ( 0 == nWaitersBlocked ) { // NO-OP
return unlock( mtxUnblockLock );
}
if (bAll) {
nWaitersToUnblock += nWaitersBlocked;
nWaitersBlocked = 0;
}
else {
nWaitersToUnblock++;
nWaitersBlocked--;
}
unlock( mtxUnblockLock );
}
else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
wait( hevBlockLock,INFINITE ); // close the gate
if ( 0 != nWaitersGone ) {
nWaitersBlocked -= nWaitersGone;
nWaitersGone = 0;
}
if (bAll) {
nWaitersToUnblock = nWaitersBlocked;
nWaitersBlocked = 0;
}
else {
nWaitersToUnblock = 1;
nWaitersBlocked--;
}
unlock( mtxUnblockLock );
set_event( hevBlockQueue );
}
else { // NO-OP
unlock( mtxUnblockLock );
}
return result;
}
---------- Algorithm 8d / IMPL_EVENT,UNBLOCK_STRATEGY == UNBLOCK_ALL
------
given:
hevBlockLock - auto-reset event
hevBlockQueueS - auto-reset event // for signals
hevBlockQueueB - manual-reset even // for broadcasts
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
eBroadcast - int // 0: no broadcast, 1: broadcast,
2:
broadcast after signal(s)
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
[auto: register int result ] // error checking omitted
[auto: register int eWasBroadcast ]
[auto: register int nSignalsWasLeft ]
[auto: register int nWaitersWasGone ]
wait( hevBlockLock,INFINITE );
nWaitersBlocked++;
set_event( hevBlockLock );
unlock( mtxExternal );
bTimedOut = waitformultiple(
hevBlockQueueS,hevBlockQueueB,timeout,ONE
);
lock( mtxUnblockLock );
if ( 0 != (SignalsWasLeft = nWaitersToUnblock) ) {
if ( bTimeout ) { // timeout (or canceled)
if ( 0 != nWaitersBlocked ) {
nWaitersBlocked--;
nSignalsWasLeft = 0; // do not unblock next
waiter below (already unblocked)
}
else if ( 1 != eBroadcast ) {
nWaitersGone = 1;
}
}
if ( 0 == --nWaitersToUnblock ) {
if ( 0 != nWaitersBlocked ) {
set_event( hevBlockLock ); // open the gate
nSignalsWasLeft = 0; // do not open the gate
below again
}
else {
if ( 0 != (eWasBroadcast = eBroadcast) ) {
eBroadcast = 0;
}
if ( 0 != (nWaitersWasGone = nWaitersGone ) {
nWaitersGone = 0;
}
}
}
else if ( 0 != eBroadcast ) {
nSignalsWasLeft = 0; // do not unblock next
waiter below (already unblocked)
}
}
else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious event :-)
wait( hevBlockLock,INFINITE );
nWaitersBlocked -= nWaitersGone; // something is going on
here - test of timeouts? :-)
set_event( hevBlockLock );
nWaitersGone = 0;
}
unlock( mtxUnblockLock );
if ( 1 == nSignalsWasLeft ) {
if ( 0 != eWasBroadcast ) {
reset_event( hevBlockQueueB );
}
if ( 0 != nWaitersWasGone ) {
reset_event( hevBlockQueueS ); // better now than
spurious
later
}
set_event( hevBlockLock ); // open the gate
}
else if ( 0 != nSignalsWasLeft ) {
set_event( hevBlockQueueS ); // unblock next waiter
}
lock( mtxExternal );
return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
[auto: register int result ]
[auto: register HANDLE hevBlockQueue ]
lock( mtxUnblockLock );
if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
if ( 0 == nWaitersBlocked ) { // NO-OP
return unlock( mtxUnblockLock );
}
if (bAll) {
nWaitersToUnblock += nWaitersBlocked;
nWaitersBlocked = 0;
eBroadcast = 2;
hevBlockQueue = hevBlockQueueB;
}
else {
nWaitersToUnblock++;
nWaitersBlocked--;
return unlock( mtxUnblockLock );
}
}
else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
wait( hevBlockLock,INFINITE ); // close the gate
if ( 0 != nWaitersGone ) {
nWaitersBlocked -= nWaitersGone;
nWaitersGone = 0;
}
if (bAll) {
nWaitersToUnblock = nWaitersBlocked;
nWaitersBlocked = 0;
eBroadcast = 1;
hevBlockQueue = hevBlockQueueB;
}
else {
nWaitersToUnblock = 1;
nWaitersBlocked--;
hevBlockQueue = hevBlockQueueS;
}
}
else { // NO-OP
return unlock( mtxUnblockLock );
}
unlock( mtxUnblockLock );
set_event( hevBlockQueue );
return result;
}