diff --git a/include/ck_ring.h b/include/ck_ring.h index ffd06f1..4b9ffa7 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -75,6 +75,27 @@ return ring->size; \ } \ CK_CC_INLINE static bool \ + ck_ring_enqueue_spsc_size_##name(struct ck_ring_##name *ring, \ + struct type *entry, \ + unsigned int *size) \ + { \ + unsigned int consumer, producer, delta; \ + unsigned int mask = ring->mask; \ + \ + consumer = ck_pr_load_uint(&ring->c_head); \ + producer = ring->p_tail; \ + delta = producer + 1; \ + *size = (producer - consumer) & mask; \ + \ + if ((delta & mask) == (consumer & mask)) \ + return false; \ + \ + ring->ring[producer & mask] = *entry; \ + ck_pr_fence_store(); \ + ck_pr_store_uint(&ring->p_tail, delta); \ + return true; \ + } \ + CK_CC_INLINE static bool \ ck_ring_enqueue_spsc_##name(struct ck_ring_##name *ring, \ struct type *entry) \ { \ @@ -114,6 +135,13 @@ return true; \ } \ CK_CC_INLINE static bool \ + ck_ring_enqueue_spmc_size_##name(struct ck_ring_##name *ring, \ + void *entry, unsigned int *size) \ + { \ + \ + return ck_ring_enqueue_spsc_size_##name(ring, entry, size); \ + } \ + CK_CC_INLINE static bool \ ck_ring_enqueue_spmc_##name(struct ck_ring_##name *ring, void *entry) \ { \ \ @@ -217,6 +245,50 @@ ck_ring_capacity(struct ck_ring *ring) return ring->size; } +/* + * Atomically enqueues the specified entry. Returns true on success, returns + * false if the ck_ring is full. This operation only support one active + * invocation at a time and works in the presence of a concurrent invocation + * of ck_ring_dequeue_spsc. + * + * This variant of ck_ring_enqueue_spsc returns the snapshot of queue length + * with respect to the linearization point. This can be used to extract ring + * size without incurring additional cacheline invalidation overhead from the + * writer. + */ +CK_CC_INLINE static bool +ck_ring_enqueue_spsc_size(struct ck_ring *ring, + void *entry, + unsigned int *size) +{ + unsigned int consumer, producer, delta; + unsigned int mask = ring->mask; + + consumer = ck_pr_load_uint(&ring->c_head); + producer = ring->p_tail; + delta = producer + 1; + *size = (producer - consumer) & mask; + + if ((delta & mask) == (consumer & mask)) + return false; + + ring->ring[producer & mask] = entry; + + /* + * Make sure to update slot value before indicating + * that the slot is available for consumption. + */ + ck_pr_fence_store(); + ck_pr_store_uint(&ring->p_tail, delta); + return true; +} + +/* + * Atomically enqueues the specified entry. Returns true on success, returns + * false if the ck_ring is full. This operation only support one active + * invocation at a time and works in the presence of a concurrent invocation + * of ck_ring_dequeue_spsc. + */ CK_CC_INLINE static bool ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry) { @@ -275,6 +347,32 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, void *data) return true; } +/* + * Atomically enqueues the specified entry. Returns true on success, returns + * false if the ck_ring is full. This operation only support one active + * invocation at a time and works in the presence of up to UINT_MAX concurrent + * invocations of ck_ring_dequeue_spmc. + * + * This variant of ck_ring_enqueue_spmc returns the snapshot of queue length + * with respect to the linearization point. This can be used to extract ring + * size without incurring additional cacheline invalidation overhead from the + * writer. + */ +CK_CC_INLINE static bool +ck_ring_enqueue_spmc_size(struct ck_ring *ring, + void *entry, + unsigned int *size) +{ + + return ck_ring_enqueue_spsc_size(ring, entry, size); +} + +/* + * Atomically enqueues the specified entry. Returns true on success, returns + * false if the ck_ring is full. This operation only support one active + * invocation at a time and works in the presence of up to UINT_MAX concurrent + * invocations of ck_ring_dequeue_spmc. + */ CK_CC_INLINE static bool ck_ring_enqueue_spmc(struct ck_ring *ring, void *entry) {