From 90c9950356ca22fbbf041094519593e7b443eded Mon Sep 17 00:00:00 2001 From: Samy Al Bahra Date: Wed, 25 Dec 2013 02:00:42 -0500 Subject: [PATCH] ck_ring: Type-agnostic backend implementation. This will serve to drive both pointer-based and type-specialized ring implementations. Some full fences have also been removed as they are unnecessary with the introduction of X_Y fence interface. --- include/ck_ring.h | 120 +++++++++++++++++++++++++++++++++------------- 1 file changed, 88 insertions(+), 32 deletions(-) diff --git a/include/ck_ring.h b/include/ck_ring.h index c6ba47b..da61a70 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -81,9 +81,10 @@ ck_ring_capacity(struct ck_ring *ring) * writer. */ CK_CC_INLINE static bool -ck_ring_enqueue_spsc_size(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - void *entry, +_ck_ring_enqueue_spsc_size(struct ck_ring *ring, + void *restrict buffer, + const void *restrict entry, + unsigned int type_size, unsigned int *size) { unsigned int consumer, producer, delta; @@ -97,7 +98,8 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, if ((delta & mask) == (consumer & mask)) return false; - buffer[producer & mask].value = entry; + buffer = (char *)buffer + type_size * (producer & mask); + memcpy(buffer, entry, type_size); /* * Make sure to update slot value before indicating @@ -108,6 +110,17 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, return true; } +CK_CC_INLINE static bool +ck_ring_enqueue_spsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *entry, + unsigned int *size) +{ + + return _ck_ring_enqueue_spsc_size(ring, buffer, &entry, + sizeof(void *), size); +} + /* * Atomically enqueues the specified entry. Returns true on success, returns * false if the ck_ring is full. This operation only support one active @@ -115,9 +128,10 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, * of ck_ring_dequeue_spsc. */ CK_CC_INLINE static bool -ck_ring_enqueue_spsc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - void *entry) +_ck_ring_enqueue_spsc(struct ck_ring *ring, + void *restrict destination, + const void *restrict source, + unsigned int size) { unsigned int consumer, producer, delta; unsigned int mask = ring->mask; @@ -129,7 +143,8 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, if ((delta & mask) == (consumer & mask)) return false; - buffer[producer & mask].value = entry; + destination = (char *)destination + size * (producer & mask); + memcpy(destination, source, size); /* * Make sure to update slot value before indicating @@ -140,13 +155,24 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, return true; } +CK_CC_INLINE static bool +ck_ring_enqueue_spsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) +{ + + return _ck_ring_enqueue_spsc(ring, buffer, + &entry, sizeof(entry)); +} + /* * Single consumer and single producer ring buffer dequeue (consumer). */ CK_CC_INLINE static bool -ck_ring_dequeue_spsc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - void *data) +_ck_ring_dequeue_spsc(struct ck_ring *ring, + void *restrict buffer, + void *restrict target, + unsigned int size) { unsigned int consumer, producer; unsigned int mask = ring->mask; @@ -163,19 +189,28 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, */ ck_pr_fence_load(); + buffer = (char *)buffer + size * (consumer & mask); + memcpy(target, buffer, size); + /* - * This is used to work-around aliasing issues (C - * lacks a generic pointer to pointer despite it - * being a reality on POSIX). This interface is - * troublesome on platforms where sizeof(void *) - * is not guaranteed to be sizeof(T *). + * Make sure copy is completed with respect to consumer + * update. */ - ck_pr_store_ptr(data, buffer[consumer & mask].value); ck_pr_fence_store(); ck_pr_store_uint(&ring->c_head, consumer + 1); return true; } +CK_CC_INLINE static bool +ck_ring_dequeue_spsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_dequeue_spsc(ring, buffer, + data, sizeof(void *)); +} + /* * Atomically enqueues the specified entry. Returns true on success, returns * false if the ck_ring is full. This operation only support one active @@ -194,7 +229,8 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring, unsigned int *size) { - return ck_ring_enqueue_spsc_size(ring, buffer, entry, size); + return ck_ring_enqueue_spsc_size(ring, buffer, + entry, size); } /* @@ -213,9 +249,10 @@ ck_ring_enqueue_spmc(struct ck_ring *ring, } CK_CC_INLINE static bool -ck_ring_trydequeue_spmc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - void *data) +_ck_ring_trydequeue_spmc(struct ck_ring *ring, + void *restrict buffer, + void *data, + unsigned int size) { unsigned int consumer, producer; unsigned int mask = ring->mask; @@ -228,20 +265,33 @@ ck_ring_trydequeue_spmc(struct ck_ring *ring, return false; ck_pr_fence_load(); - ck_pr_store_ptr(data, buffer[consumer & mask].value); - ck_pr_fence_memory(); + buffer = (char *)buffer + size * (consumer & mask); + memcpy(data, buffer, size); + + ck_pr_fence_store_atomic(); return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); } CK_CC_INLINE static bool -ck_ring_dequeue_spmc(struct ck_ring *ring, +ck_ring_trydequeue_spmc(struct ck_ring *ring, struct ck_ring_buffer *buffer, void *data) { + + return _ck_ring_trydequeue_spmc(ring, + buffer, data, sizeof(void *)); +} + +CK_CC_INLINE static bool +_ck_ring_dequeue_spmc(struct ck_ring *ring, + void *buffer, + void *data, + unsigned int size) +{ unsigned int consumer, producer; unsigned int mask = ring->mask; - void *r; + char *target; consumer = ck_pr_load_uint(&ring->c_head); @@ -265,23 +315,29 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, * volatile load to force volatile semantics while allowing * for r itself to remain aliased across the loop. */ - r = ck_pr_load_ptr(&buffer[consumer & mask].value); + target = (char *)buffer + size * (consumer & mask); + memcpy(data, target, size); /* Serialize load with respect to head update. */ - ck_pr_fence_memory(); + ck_pr_fence_store_atomic(); } while (ck_pr_cas_uint_value(&ring->c_head, consumer, consumer + 1, &consumer) == false); - /* - * Force spillage while avoiding aliasing issues that aren't - * a problem on POSIX. - */ - ck_pr_store_ptr(data, r); return true; } +CK_CC_INLINE static bool +ck_ring_dequeue_spmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_dequeue_spmc(ring, buffer, data, + sizeof(void *)); +} + CK_CC_INLINE static void ck_ring_init(struct ck_ring *ring, unsigned int size) {