diff --git a/include/ck_ring.h b/include/ck_ring.h index e531f0b..62d9f4b 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -69,105 +69,80 @@ ck_ring_capacity(const struct ck_ring *ring) return ring->size; } -#define CK_RING_ENQUEUE_SP_DEFINE(ring, buffer, entry, ts, size, P) do {\ - unsigned int consumer, producer, delta; \ - unsigned int mask = ring->mask; \ - \ - consumer = ck_pr_load_uint(&ring->c_head); \ - producer = ring->p_tail; \ - delta = producer + 1; \ - if (P) \ - *(unsigned int *)size = (producer - consumer) & mask; \ - \ - if ((delta & mask) == (consumer & mask)) \ - return false; \ - \ - buffer = (char *)buffer + ts * (producer & mask); \ - memcpy(buffer, entry, ts); \ - \ - /* \ - * Make sure to update slot value before indicating \ - * that the slot is available for consumption. \ - */ \ - ck_pr_fence_store(); \ - ck_pr_store_uint(&ring->p_tail, delta); \ - return true; \ -} while (0) - -/* - * This variant of ck_ring_enqueue_spsc returns the snapshot of queue length - * with respect to the linearization point. This can be used to extract ring - * size without incurring additional cacheline invalidation overhead from the - * writer. - */ -CK_CC_INLINE static bool -_ck_ring_enqueue_spsc_size(struct ck_ring *ring, - void *restrict buffer, - const void *restrict entry, - unsigned int ts, - unsigned int *size) +CK_CC_INLINE static void +ck_ring_init(struct ck_ring *ring, unsigned int size) { - CK_RING_ENQUEUE_SP_DEFINE(ring, buffer, entry, ts, - size, true); + ring->size = size; + ring->mask = size - 1; + ring->p_tail = 0; + ring->p_head = 0; + ring->c_head = 0; + return; } /* - * Atomically enqueues the specified entry. Returns true on success, returns - * false if the ck_ring is full. This operation only support one active - * invocation at a time and works in the presence of a concurrent invocation - * of ck_ring_dequeue_spsc. + * The _ck_ring_* namespace is internal only and must not used externally. */ -CK_CC_INLINE static bool -_ck_ring_enqueue_spsc(struct ck_ring *ring, - void *restrict destination, - const void *restrict source, - unsigned int ts) +CK_CC_FORCE_INLINE static bool +_ck_ring_enqueue_sp(struct ck_ring *ring, + void *CK_CC_RESTRICT buffer, + const void *CK_CC_RESTRICT entry, + unsigned int ts, + unsigned int *size) { + const unsigned int mask = ring->mask; + unsigned int consumer, producer, delta; - CK_RING_ENQUEUE_SP_DEFINE(ring, destination, source, - ts, NULL, false); -} + consumer = ck_pr_load_uint(&ring->c_head); + producer = ring->p_tail; + delta = producer + 1; + if (size != NULL) + *size = (producer - consumer) & mask; -#undef CK_RING_ENQUEUE_SP_DEFINE + if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) + return false; -CK_CC_INLINE static bool -ck_ring_enqueue_spsc_size(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - const void *entry, - unsigned int *size) -{ + buffer = (char *)buffer + ts * (producer & mask); + memcpy(buffer, entry, ts); - return _ck_ring_enqueue_spsc_size(ring, buffer, &entry, - sizeof(entry), size); + /* + * Make sure to update slot value before indicating + * that the slot is available for consumption. + */ + ck_pr_fence_store(); + ck_pr_store_uint(&ring->p_tail, delta); + return true; } -CK_CC_INLINE static bool -ck_ring_enqueue_spsc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - const void *entry) +CK_CC_FORCE_INLINE static bool +_ck_ring_enqueue_sp_size(struct ck_ring *ring, + void *CK_CC_RESTRICT buffer, + const void *CK_CC_RESTRICT entry, + unsigned int ts, + unsigned int *size) { + unsigned int sz; + bool r; - return _ck_ring_enqueue_spsc(ring, buffer, - &entry, sizeof(entry)); + r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz); + *size = sz; + return r; } -/* - * Single consumer and single producer ring buffer dequeue (consumer). - */ -CK_CC_INLINE static bool -_ck_ring_dequeue_spsc(struct ck_ring *ring, - const void *restrict buffer, - void *restrict target, +CK_CC_FORCE_INLINE static bool +_ck_ring_dequeue_sc(struct ck_ring *ring, + const void *CK_CC_RESTRICT buffer, + void *CK_CC_RESTRICT target, unsigned int size) { + const unsigned int mask = ring->mask; unsigned int consumer, producer; - unsigned int mask = ring->mask; consumer = ring->c_head; producer = ck_pr_load_uint(&ring->p_tail); - if (consumer == producer) + if (CK_CC_UNLIKELY(consumer == producer)) return false; /* @@ -188,67 +163,90 @@ _ck_ring_dequeue_spsc(struct ck_ring *ring, return true; } -CK_CC_INLINE static bool -ck_ring_dequeue_spsc(struct ck_ring *ring, - const struct ck_ring_buffer *buffer, - void *data) -{ - - return _ck_ring_dequeue_spsc(ring, buffer, - data, sizeof(data)); -} - -/* - * Atomically enqueues the specified entry. Returns true on success, returns - * false if the ck_ring is full. This operation only support one active - * invocation at a time and works in the presence of up to UINT_MAX concurrent - * invocations of ck_ring_dequeue_spmc. - * - * This variant of ck_ring_enqueue_spmc returns the snapshot of queue length - * with respect to the linearization point. This can be used to extract ring - * size without incurring additional cacheline invalidation overhead from the - * writer. - */ -CK_CC_INLINE static bool -ck_ring_enqueue_spmc_size(struct ck_ring *ring, - struct ck_ring_buffer *buffer, +CK_CC_FORCE_INLINE static bool +_ck_ring_enqueue_mp(struct ck_ring *ring, + void *buffer, const void *entry, + unsigned int ts, unsigned int *size) { + const unsigned int mask = ring->mask; + unsigned int producer, consumer, delta; + bool r = true; + + producer = ck_pr_load_uint(&ring->p_head); - return ck_ring_enqueue_spsc_size(ring, buffer, - entry, size); + do { + /* + * The snapshot of producer must be up to date with + * respect to consumer. + */ + ck_pr_fence_load(); + consumer = ck_pr_load_uint(&ring->c_head); + + delta = producer + 1; + if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) { + r = false; + goto leave; + } + } while (ck_pr_cas_uint_value(&ring->p_head, + producer, + delta, + &producer) == false); + + buffer = (char *)buffer + ts * (producer & mask); + memcpy(buffer, entry, ts); + + /* + * Wait until all concurrent producers have completed writing + * their data into the ring buffer. + */ + while (ck_pr_load_uint(&ring->p_tail) != producer) + ck_pr_stall(); + + /* + * Ensure that copy is completed before updating shared producer + * counter. + */ + ck_pr_fence_store(); + ck_pr_store_uint(&ring->p_tail, delta); + +leave: + if (size != NULL) + *size = (producer - consumer) & mask; + + return r; } -/* - * Atomically enqueues the specified entry. Returns true on success, returns - * false if the ck_ring is full. This operation only support one active - * invocation at a time and works in the presence of up to UINT_MAX concurrent - * invocations of ck_ring_dequeue_spmc. - */ -CK_CC_INLINE static bool -ck_ring_enqueue_spmc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - const void *entry) +CK_CC_FORCE_INLINE static bool +_ck_ring_enqueue_mp_size(struct ck_ring *ring, + void *buffer, + const void *entry, + unsigned int ts, + unsigned int *size) { + unsigned int sz; + bool r; - return ck_ring_enqueue_spsc(ring, buffer, entry); + r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz); + *size = sz; + return r; } -CK_CC_INLINE static bool -_ck_ring_trydequeue_spmc(struct ck_ring *ring, +CK_CC_FORCE_INLINE static bool +_ck_ring_trydequeue_mc(struct ck_ring *ring, const void *buffer, void *data, unsigned int size) { + const unsigned int mask = ring->mask; unsigned int consumer, producer; - unsigned int mask = ring->mask; consumer = ck_pr_load_uint(&ring->c_head); ck_pr_fence_load(); producer = ck_pr_load_uint(&ring->p_tail); - if (consumer == producer) + if (CK_CC_UNLIKELY(consumer == producer)) return false; ck_pr_fence_load(); @@ -260,18 +258,8 @@ _ck_ring_trydequeue_spmc(struct ck_ring *ring, return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); } -CK_CC_INLINE static bool -ck_ring_trydequeue_spmc(struct ck_ring *ring, - const struct ck_ring_buffer *buffer, - void *data) -{ - - return _ck_ring_trydequeue_spmc(ring, - buffer, data, sizeof(data)); -} - -CK_CC_INLINE static bool -_ck_ring_dequeue_spmc(struct ck_ring *ring, +CK_CC_FORCE_INLINE static bool +_ck_ring_dequeue_mc(struct ck_ring *ring, const void *buffer, void *data, unsigned int ts) @@ -291,7 +279,7 @@ _ck_ring_dequeue_spmc(struct ck_ring *ring, ck_pr_fence_load(); producer = ck_pr_load_uint(&ring->p_tail); - if (consumer == producer) + if (CK_CC_UNLIKELY(consumer == producer)) return false; ck_pr_fence_load(); @@ -309,168 +297,166 @@ _ck_ring_dequeue_spmc(struct ck_ring *ring, return true; } +/* + * The ck_ring_*_spsc namespace is the public interface for interacting with a + * ring buffer containing pointers. Correctness is only provided if there is up + * to one concurrent consumer and up to one concurrent producer. + */ CK_CC_INLINE static bool -ck_ring_dequeue_spmc(struct ck_ring *ring, - const struct ck_ring_buffer *buffer, - void *data) +ck_ring_enqueue_spsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry, + unsigned int *size) { - return _ck_ring_dequeue_spmc(ring, buffer, data, - sizeof(*buffer)); + return _ck_ring_enqueue_sp_size(ring, buffer, &entry, + sizeof(entry), size); } -#define CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, size, P) do {\ - const unsigned int mask = ring->mask; \ - unsigned int producer, consumer, delta; \ - bool r = true; \ - \ - producer = ck_pr_load_uint(&ring->p_head); \ - \ - do { \ - /* \ - * The snapshot of producer must be up to date with \ - * respect to consumer. \ - */ \ - ck_pr_fence_load(); \ - consumer = ck_pr_load_uint(&ring->c_head); \ - \ - delta = producer + 1; \ - if ((delta & mask) == (consumer & mask)) { \ - r = false; \ - goto leave; \ - } \ - } while (ck_pr_cas_uint_value(&ring->p_head, \ - producer, \ - delta, \ - &producer) == false); \ - \ - buffer = (char *)buffer + ts * (producer & mask); \ - memcpy(buffer, entry, ts); \ - \ - /* \ - * Wait until all concurrent producers have completed writing \ - * their data into the ring buffer. \ - */ \ - while (ck_pr_load_uint(&ring->p_tail) != producer) \ - ck_pr_stall(); \ - \ - /* \ - * Ensure that copy is completed before updating shared producer\ - * counter. \ - */ \ - ck_pr_fence_store(); \ - ck_pr_store_uint(&ring->p_tail, delta); \ - \ -leave: \ - if (P) \ - *(unsigned int *)size = (producer - consumer) & mask; \ - \ - return r; \ -} while (0) - CK_CC_INLINE static bool -_ck_ring_enqueue_mpsc_size(struct ck_ring *ring, - void *buffer, - const void *entry, - unsigned int ts, - unsigned int *size) +ck_ring_enqueue_spsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) { - CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, size, true); + return _ck_ring_enqueue_sp(ring, buffer, + &entry, sizeof(entry), NULL); } CK_CC_INLINE static bool -_ck_ring_enqueue_mpsc(struct ck_ring *ring, - void *buffer, - const void *entry, - unsigned int ts) +ck_ring_dequeue_spsc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) { - CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, NULL, false); + return _ck_ring_dequeue_sc(ring, buffer, + data, sizeof(data)); } -#undef CK_RING_ENQUEUE_MP_DEFINE - +/* + * The ck_ring_*_mpmc namespace is the public interface for interacting with a + * ring buffer containing pointers. Correctness is provided for any number of + * producers and consumers. + */ CK_CC_INLINE static bool -ck_ring_enqueue_mpsc(struct ck_ring *ring, +ck_ring_enqueue_mpmc(struct ck_ring *ring, struct ck_ring_buffer *buffer, const void *entry) { - return _ck_ring_enqueue_mpsc(ring, buffer, &entry, sizeof(entry)); + return _ck_ring_enqueue_mp(ring, buffer, &entry, + sizeof(entry), NULL); } CK_CC_INLINE static bool -ck_ring_enqueue_mpsc_size(struct ck_ring *ring, +ck_ring_enqueue_mpmc_size(struct ck_ring *ring, struct ck_ring_buffer *buffer, const void *entry, unsigned int *size) { - return _ck_ring_enqueue_mpsc_size(ring, buffer, &entry, + return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry), size); } CK_CC_INLINE static bool -ck_ring_dequeue_mpsc(struct ck_ring *ring, +ck_ring_trydequeue_mpmc(struct ck_ring *ring, const struct ck_ring_buffer *buffer, void *data) { - return _ck_ring_dequeue_spsc(ring, buffer, data, - sizeof(data)); + return _ck_ring_trydequeue_mc(ring, + buffer, data, sizeof(data)); } CK_CC_INLINE static bool -ck_ring_enqueue_mpmc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - const void *entry) +ck_ring_dequeue_mpmc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) { - return _ck_ring_enqueue_mpsc(ring, buffer, &entry, sizeof(entry)); + return _ck_ring_dequeue_mc(ring, buffer, data, + sizeof(data)); } +/* + * The ck_ring_*_spmc namespace is the public interface for interacting with a + * ring buffer containing pointers. Correctness is provided for any number of + * consumers with up to one concurrent producer. + */ CK_CC_INLINE static bool -ck_ring_enqueue_mpmc_size(struct ck_ring *ring, +ck_ring_enqueue_spmc_size(struct ck_ring *ring, struct ck_ring_buffer *buffer, const void *entry, unsigned int *size) { - return _ck_ring_enqueue_mpsc_size(ring, buffer, &entry, + return _ck_ring_enqueue_sp_size(ring, buffer, &entry, sizeof(entry), size); } CK_CC_INLINE static bool -ck_ring_trydequeue_mpmc(struct ck_ring *ring, +ck_ring_enqueue_spmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) +{ + + return _ck_ring_enqueue_sp(ring, buffer, &entry, + sizeof(entry), NULL); +} + +CK_CC_INLINE static bool +ck_ring_trydequeue_spmc(struct ck_ring *ring, const struct ck_ring_buffer *buffer, void *data) { - return _ck_ring_trydequeue_spmc(ring, - buffer, data, sizeof(data)); + return _ck_ring_trydequeue_mc(ring, buffer, data, sizeof(data)); } CK_CC_INLINE static bool -ck_ring_dequeue_mpmc(struct ck_ring *ring, +ck_ring_dequeue_spmc(struct ck_ring *ring, const struct ck_ring_buffer *buffer, void *data) { - return _ck_ring_dequeue_spmc(ring, buffer, data, - sizeof(data)); + return _ck_ring_dequeue_mc(ring, buffer, data, sizeof(data)); } -CK_CC_INLINE static void -ck_ring_init(struct ck_ring *ring, unsigned int size) +/* + * The ck_ring_*_mpsc namespace is the public interface for interacting with a + * ring buffer containing pointers. Correctness is provided for any number of + * producers with up to one concurrent consumers. + */ +CK_CC_INLINE static bool +ck_ring_enqueue_mpsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) { - ring->size = size; - ring->mask = size - 1; - ring->p_tail = 0; - ring->p_head = 0; - ring->c_head = 0; - return; + return _ck_ring_enqueue_mp(ring, buffer, entry, + sizeof(entry), NULL); +} + +CK_CC_INLINE static bool +ck_ring_enqueue_mpsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry, + unsigned int *size) +{ + + return _ck_ring_enqueue_mp_size(ring, buffer, &entry, + sizeof(entry), size); +} + +CK_CC_INLINE static bool +ck_ring_dequeue_mpsc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_dequeue_sc(ring, buffer, data, + sizeof(data)); } #define CK_RING_PROTOTYPE(name, type) \ @@ -481,7 +467,7 @@ ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \ unsigned int *d) \ { \ \ - return _ck_ring_enqueue_spsc_size(a, b, c, \ + return _ck_ring_enqueue_sp_size(a, b, c, \ sizeof(struct type), d); \ } \ \ @@ -491,8 +477,8 @@ ck_ring_enqueue_spsc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_enqueue_spsc(a, b, c, \ - sizeof(struct type)); \ + return _ck_ring_enqueue_sp(a, b, c, \ + sizeof(struct type), NULL); \ } \ \ CK_CC_INLINE static bool \ @@ -501,7 +487,7 @@ ck_ring_dequeue_spsc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_dequeue_spsc(a, b, c, \ + return _ck_ring_dequeue_sc(a, b, c, \ sizeof(struct type)); \ } \ \ @@ -512,7 +498,7 @@ ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ unsigned int *d) \ { \ \ - return _ck_ring_enqueue_spsc_size(a, b, c, \ + return _ck_ring_enqueue_sp_size(a, b, c, \ sizeof(struct type), d); \ } \ \ @@ -522,8 +508,8 @@ ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_enqueue_spsc(a, b, c, \ - sizeof(struct type)); \ + return _ck_ring_enqueue_sp(a, b, c, \ + sizeof(struct type), NULL); \ } \ \ CK_CC_INLINE static bool \ @@ -532,7 +518,7 @@ ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_trydequeue_spmc(a, \ + return _ck_ring_trydequeue_mc(a, \ b, c, sizeof(struct type)); \ } \ \ @@ -542,7 +528,7 @@ ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_dequeue_spmc(a, b, c, \ + return _ck_ring_dequeue_mc(a, b, c, \ sizeof(struct type)); \ } \ \ @@ -552,8 +538,8 @@ ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_enqueue_mpsc(a, b, c, \ - sizeof(struct type)); \ + return _ck_ring_enqueue_mp(a, b, c, \ + sizeof(struct type), NULL); \ } \ \ CK_CC_INLINE static bool \ @@ -563,7 +549,7 @@ ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ unsigned int *d) \ { \ \ - return _ck_ring_enqueue_mpsc_size(a, b, c, \ + return _ck_ring_enqueue_mp_size(a, b, c, \ sizeof(struct type), d); \ } \ \ @@ -573,7 +559,7 @@ ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_dequeue_spsc(a, b, c, \ + return _ck_ring_dequeue_sc(a, b, c, \ sizeof(struct type)); \ } \ \ @@ -584,7 +570,7 @@ ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ unsigned int *d) \ { \ \ - return _ck_ring_enqueue_mpsc_size(a, b, c, \ + return _ck_ring_enqueue_mp_size(a, b, c, \ sizeof(struct type), d); \ } \ \ @@ -594,8 +580,8 @@ ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_enqueue_mpsc(a, b, c, \ - sizeof(struct type)); \ + return _ck_ring_enqueue_mp(a, b, c, \ + sizeof(struct type), NULL); \ } \ \ CK_CC_INLINE static bool \ @@ -604,7 +590,7 @@ ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_trydequeue_spmc(a, \ + return _ck_ring_trydequeue_mc(a, \ b, c, sizeof(struct type)); \ } \ \ @@ -614,7 +600,7 @@ ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ struct type *c) \ { \ \ - return _ck_ring_dequeue_spmc(a, b, c, \ + return _ck_ring_dequeue_mc(a, b, c, \ sizeof(struct type)); \ }