From 44a6527f33d30b12ab16b6b7a4295aa6104d9c73 Mon Sep 17 00:00:00 2001 From: Paul Khuong Date: Fri, 23 Jun 2017 15:17:34 -0400 Subject: [PATCH] ck_ring.h: make _ck_ring_enqueue_mp less failure happy With preemption, it is possible for _ck_ring_enqueue_mp to have a snapshot of p_head so stale with respect to the later snapshot of c_head that a comparison modulo (small) ring size will erroneously conclude that the ring is full. Detect that case and retry rather than failing. We only retry when the enqueuers have made global forward progress, so the first loop is as lock-free as it ever was. Bonus: the new condition should be marginally faster. --- include/ck_ring.h | 51 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/include/ck_ring.h b/include/ck_ring.h index 8a2a791..1213a21 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -176,7 +176,7 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, producer = ck_pr_load_uint(&ring->p_head); - do { + for (;;) { /* * The snapshot of producer must be up to date with * respect to consumer. @@ -185,14 +185,49 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, consumer = ck_pr_load_uint(&ring->c_head); delta = producer + 1; - if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) { - r = false; - goto leave; + /* + * Only try to CAS if the producer is not clearly + * stale (not less than consumer) and the buffer is + * definitely not full. + */ + if (CK_CC_LIKELY((producer - consumer) < mask)) { + if (ck_pr_cas_uint_value(&ring->p_head, + producer, + delta, + &producer)) { + break; + } + } else { + unsigned int new_producer; + + /* + * Slow path. Either the buffer is full or we + * have a stale snapshot of p_head. Execute a + * second read of p_read that must be ordered + * wrt the snapshot of c_head. + */ + ck_pr_fence_load(); + new_producer = ck_pr_load_uint(&ring->p_head); + + /* + * Only fail if we haven't made forward + * progress in production: the buffer must + * have been full when we read new_producer + * (or we wrapped around UINT_MAX during this + * iteration). + */ + if (producer == new_producer) { + r = false; + goto leave; + } + + /* + * p_head advanced during this iteration. Try + * again. + */ + producer = new_producer; } - } while (ck_pr_cas_uint_value(&ring->p_head, - producer, - delta, - &producer) == false); + } buffer = (char *)buffer + ts * (producer & mask); memcpy(buffer, entry, ts);