diff --git a/include/ck_ring.h b/include/ck_ring.h index 8a2a791..1213a21 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -176,7 +176,7 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, producer = ck_pr_load_uint(&ring->p_head); - do { + for (;;) { /* * The snapshot of producer must be up to date with * respect to consumer. @@ -185,14 +185,49 @@ _ck_ring_enqueue_mp(struct ck_ring *ring, consumer = ck_pr_load_uint(&ring->c_head); delta = producer + 1; - if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) { - r = false; - goto leave; + /* + * Only try to CAS if the producer is not clearly + * stale (not less than consumer) and the buffer is + * definitely not full. + */ + if (CK_CC_LIKELY((producer - consumer) < mask)) { + if (ck_pr_cas_uint_value(&ring->p_head, + producer, + delta, + &producer)) { + break; + } + } else { + unsigned int new_producer; + + /* + * Slow path. Either the buffer is full or we + * have a stale snapshot of p_head. Execute a + * second read of p_read that must be ordered + * wrt the snapshot of c_head. + */ + ck_pr_fence_load(); + new_producer = ck_pr_load_uint(&ring->p_head); + + /* + * Only fail if we haven't made forward + * progress in production: the buffer must + * have been full when we read new_producer + * (or we wrapped around UINT_MAX during this + * iteration). + */ + if (producer == new_producer) { + r = false; + goto leave; + } + + /* + * p_head advanced during this iteration. Try + * again. + */ + producer = new_producer; } - } while (ck_pr_cas_uint_value(&ring->p_head, - producer, - delta, - &producer) == false); + } buffer = (char *)buffer + ts * (producer & mask); memcpy(buffer, entry, ts);