ck_ring: Type-agnostic backend implementation.

This will serve to drive both pointer-based and type-specialized ring
implementations. Some full fences have also been removed as they are
unnecessary with the introduction of X_Y fence interface.
ck_pring
Samy Al Bahra 11 years ago
parent 92d662fa5e
commit 90c9950356

@ -81,9 +81,10 @@ ck_ring_capacity(struct ck_ring *ring)
* writer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry,
_ck_ring_enqueue_spsc_size(struct ck_ring *ring,
void *restrict buffer,
const void *restrict entry,
unsigned int type_size,
unsigned int *size)
{
unsigned int consumer, producer, delta;
@ -97,7 +98,8 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring,
if ((delta & mask) == (consumer & mask))
return false;
buffer[producer & mask].value = entry;
buffer = (char *)buffer + type_size * (producer & mask);
memcpy(buffer, entry, type_size);
/*
* Make sure to update slot value before indicating
@ -108,6 +110,17 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring,
return true;
}
CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry,
unsigned int *size)
{
return _ck_ring_enqueue_spsc_size(ring, buffer, &entry,
sizeof(void *), size);
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
@ -115,9 +128,10 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring,
* of ck_ring_dequeue_spsc.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry)
_ck_ring_enqueue_spsc(struct ck_ring *ring,
void *restrict destination,
const void *restrict source,
unsigned int size)
{
unsigned int consumer, producer, delta;
unsigned int mask = ring->mask;
@ -129,7 +143,8 @@ ck_ring_enqueue_spsc(struct ck_ring *ring,
if ((delta & mask) == (consumer & mask))
return false;
buffer[producer & mask].value = entry;
destination = (char *)destination + size * (producer & mask);
memcpy(destination, source, size);
/*
* Make sure to update slot value before indicating
@ -140,13 +155,24 @@ ck_ring_enqueue_spsc(struct ck_ring *ring,
return true;
}
CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
{
return _ck_ring_enqueue_spsc(ring, buffer,
&entry, sizeof(entry));
}
/*
* Single consumer and single producer ring buffer dequeue (consumer).
*/
CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
_ck_ring_dequeue_spsc(struct ck_ring *ring,
void *restrict buffer,
void *restrict target,
unsigned int size)
{
unsigned int consumer, producer;
unsigned int mask = ring->mask;
@ -163,19 +189,28 @@ ck_ring_dequeue_spsc(struct ck_ring *ring,
*/
ck_pr_fence_load();
buffer = (char *)buffer + size * (consumer & mask);
memcpy(target, buffer, size);
/*
* This is used to work-around aliasing issues (C
* lacks a generic pointer to pointer despite it
* being a reality on POSIX). This interface is
* troublesome on platforms where sizeof(void *)
* is not guaranteed to be sizeof(T *).
* Make sure copy is completed with respect to consumer
* update.
*/
ck_pr_store_ptr(data, buffer[consumer & mask].value);
ck_pr_fence_store();
ck_pr_store_uint(&ring->c_head, consumer + 1);
return true;
}
CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_dequeue_spsc(ring, buffer,
data, sizeof(void *));
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
@ -194,7 +229,8 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring,
unsigned int *size)
{
return ck_ring_enqueue_spsc_size(ring, buffer, entry, size);
return ck_ring_enqueue_spsc_size(ring, buffer,
entry, size);
}
/*
@ -213,9 +249,10 @@ ck_ring_enqueue_spmc(struct ck_ring *ring,
}
CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
_ck_ring_trydequeue_spmc(struct ck_ring *ring,
void *restrict buffer,
void *data,
unsigned int size)
{
unsigned int consumer, producer;
unsigned int mask = ring->mask;
@ -228,20 +265,33 @@ ck_ring_trydequeue_spmc(struct ck_ring *ring,
return false;
ck_pr_fence_load();
ck_pr_store_ptr(data, buffer[consumer & mask].value);
ck_pr_fence_memory();
buffer = (char *)buffer + size * (consumer & mask);
memcpy(data, buffer, size);
ck_pr_fence_store_atomic();
return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
}
CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring *ring,
ck_ring_trydequeue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_trydequeue_spmc(ring,
buffer, data, sizeof(void *));
}
CK_CC_INLINE static bool
_ck_ring_dequeue_spmc(struct ck_ring *ring,
void *buffer,
void *data,
unsigned int size)
{
unsigned int consumer, producer;
unsigned int mask = ring->mask;
void *r;
char *target;
consumer = ck_pr_load_uint(&ring->c_head);
@ -265,23 +315,29 @@ ck_ring_dequeue_spmc(struct ck_ring *ring,
* volatile load to force volatile semantics while allowing
* for r itself to remain aliased across the loop.
*/
r = ck_pr_load_ptr(&buffer[consumer & mask].value);
target = (char *)buffer + size * (consumer & mask);
memcpy(data, target, size);
/* Serialize load with respect to head update. */
ck_pr_fence_memory();
ck_pr_fence_store_atomic();
} while (ck_pr_cas_uint_value(&ring->c_head,
consumer,
consumer + 1,
&consumer) == false);
/*
* Force spillage while avoiding aliasing issues that aren't
* a problem on POSIX.
*/
ck_pr_store_ptr(data, r);
return true;
}
CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_dequeue_spmc(ring, buffer, data,
sizeof(void *));
}
CK_CC_INLINE static void
ck_ring_init(struct ck_ring *ring, unsigned int size)
{

Loading…
Cancel
Save