ck_ring.h: make _ck_ring_enqueue_mp less failure happy

With preemption, it is possible for _ck_ring_enqueue_mp to have a
snapshot of p_head so stale with respect to the later snapshot of
c_head that a comparison modulo (small) ring size will erroneously
conclude that the ring is full.

Detect that case and retry rather than failing.  We only retry when
the enqueuers have made global forward progress, so the first loop
is as lock-free as it ever was.

Bonus: the new condition should be marginally faster.
ck_pring
Paul Khuong 8 years ago
parent b87563b7bd
commit 44a6527f33

@ -176,7 +176,7 @@ _ck_ring_enqueue_mp(struct ck_ring *ring,
producer = ck_pr_load_uint(&ring->p_head); producer = ck_pr_load_uint(&ring->p_head);
do { for (;;) {
/* /*
* The snapshot of producer must be up to date with * The snapshot of producer must be up to date with
* respect to consumer. * respect to consumer.
@ -185,14 +185,49 @@ _ck_ring_enqueue_mp(struct ck_ring *ring,
consumer = ck_pr_load_uint(&ring->c_head); consumer = ck_pr_load_uint(&ring->c_head);
delta = producer + 1; delta = producer + 1;
if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) { /*
* Only try to CAS if the producer is not clearly
* stale (not less than consumer) and the buffer is
* definitely not full.
*/
if (CK_CC_LIKELY((producer - consumer) < mask)) {
if (ck_pr_cas_uint_value(&ring->p_head,
producer,
delta,
&producer)) {
break;
}
} else {
unsigned int new_producer;
/*
* Slow path. Either the buffer is full or we
* have a stale snapshot of p_head. Execute a
* second read of p_read that must be ordered
* wrt the snapshot of c_head.
*/
ck_pr_fence_load();
new_producer = ck_pr_load_uint(&ring->p_head);
/*
* Only fail if we haven't made forward
* progress in production: the buffer must
* have been full when we read new_producer
* (or we wrapped around UINT_MAX during this
* iteration).
*/
if (producer == new_producer) {
r = false; r = false;
goto leave; goto leave;
} }
} while (ck_pr_cas_uint_value(&ring->p_head,
producer, /*
delta, * p_head advanced during this iteration. Try
&producer) == false); * again.
*/
producer = new_producer;
}
}
buffer = (char *)buffer + ts * (producer & mask); buffer = (char *)buffer + ts * (producer & mask);
memcpy(buffer, entry, ts); memcpy(buffer, entry, ts);

Loading…
Cancel
Save