diff --git a/include/ck_ring.h b/include/ck_ring.h index 707a8fb..c6ba47b 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -47,6 +47,11 @@ struct ck_ring { }; typedef struct ck_ring ck_ring_t; +struct ck_ring_buffer { + void *value; +}; +typedef struct ck_ring_buffer ck_ring_buffer_t; + CK_CC_INLINE static unsigned int ck_ring_size(struct ck_ring *ring) { @@ -64,11 +69,6 @@ ck_ring_capacity(struct ck_ring *ring) return ring->size; } -struct ck_ring_buffer { - void *ring; -}; -typedef struct ck_ring_buffer ck_ring_buffer_t; - /* * Atomically enqueues the specified entry. Returns true on success, returns * false if the ck_ring is full. This operation only support one active @@ -81,13 +81,13 @@ typedef struct ck_ring_buffer ck_ring_buffer_t; * writer. */ CK_CC_INLINE static bool -ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf, +ck_ring_enqueue_spsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, void *entry, unsigned int *size) { unsigned int consumer, producer, delta; unsigned int mask = ring->mask; - void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); producer = ring->p_tail; @@ -97,7 +97,7 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf, if ((delta & mask) == (consumer & mask)) return false; - ring_buf[producer & mask] = entry; + buffer[producer & mask].value = entry; /* * Make sure to update slot value before indicating @@ -115,11 +115,12 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf, * of ck_ring_dequeue_spsc. */ CK_CC_INLINE static bool -ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) +ck_ring_enqueue_spsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *entry) { unsigned int consumer, producer, delta; unsigned int mask = ring->mask; - void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); producer = ring->p_tail; @@ -128,7 +129,7 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) if ((delta & mask) == (consumer & mask)) return false; - ring_buf[producer & mask] = entry; + buffer[producer & mask].value = entry; /* * Make sure to update slot value before indicating @@ -143,11 +144,12 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) * Single consumer and single producer ring buffer dequeue (consumer). */ CK_CC_INLINE static bool -ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) +ck_ring_dequeue_spsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *data) { unsigned int consumer, producer; unsigned int mask = ring->mask; - void **ring_buf = buf.ring; consumer = ring->c_head; producer = ck_pr_load_uint(&ring->p_tail); @@ -168,7 +170,7 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) * troublesome on platforms where sizeof(void *) * is not guaranteed to be sizeof(T *). */ - ck_pr_store_ptr(data, ring_buf[consumer & mask]); + ck_pr_store_ptr(data, buffer[consumer & mask].value); ck_pr_fence_store(); ck_pr_store_uint(&ring->c_head, consumer + 1); return true; @@ -186,12 +188,13 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) * writer. */ CK_CC_INLINE static bool -ck_ring_enqueue_spmc_size(struct ck_ring *ring, ck_ring_buffer_t buf, +ck_ring_enqueue_spmc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, void *entry, unsigned int *size) { - return ck_ring_enqueue_spsc_size(ring, buf, entry, size); + return ck_ring_enqueue_spsc_size(ring, buffer, entry, size); } /* @@ -201,18 +204,21 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring, ck_ring_buffer_t buf, * invocations of ck_ring_dequeue_spmc. */ CK_CC_INLINE static bool -ck_ring_enqueue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) +ck_ring_enqueue_spmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *entry) { - return ck_ring_enqueue_spsc(ring, buf, entry); + return ck_ring_enqueue_spsc(ring, buffer, entry); } CK_CC_INLINE static bool -ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) +ck_ring_trydequeue_spmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *data) { unsigned int consumer, producer; unsigned int mask = ring->mask; - void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); ck_pr_fence_load(); @@ -222,19 +228,20 @@ ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) return false; ck_pr_fence_load(); - ck_pr_store_ptr(data, ring_buf[consumer & mask]); + ck_pr_store_ptr(data, buffer[consumer & mask].value); ck_pr_fence_memory(); return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); } CK_CC_INLINE static bool -ck_ring_dequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) +ck_ring_dequeue_spmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *data) { unsigned int consumer, producer; unsigned int mask = ring->mask; void *r; - void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); @@ -258,7 +265,7 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) * volatile load to force volatile semantics while allowing * for r itself to remain aliased across the loop. */ - r = ck_pr_load_ptr(&ring_buf[consumer & mask]); + r = ck_pr_load_ptr(&buffer[consumer & mask].value); /* Serialize load with respect to head update. */ ck_pr_fence_memory();