ck_ring: Clean-up internal implementations.

Break out internal implementations to _mp and _sc variants from which
public interface is built on. Do not rely on macro. Adopt CK_CC_RESTRICT
instead of using restrict directly.
ck_pring
Samy Al Bahra 9 years ago
parent 414ba224ca
commit fee4e7187b

@ -69,105 +69,80 @@ ck_ring_capacity(const struct ck_ring *ring)
return ring->size;
}
#define CK_RING_ENQUEUE_SP_DEFINE(ring, buffer, entry, ts, size, P) do {\
unsigned int consumer, producer, delta; \
unsigned int mask = ring->mask; \
\
consumer = ck_pr_load_uint(&ring->c_head); \
producer = ring->p_tail; \
delta = producer + 1; \
if (P) \
*(unsigned int *)size = (producer - consumer) & mask; \
\
if ((delta & mask) == (consumer & mask)) \
return false; \
\
buffer = (char *)buffer + ts * (producer & mask); \
memcpy(buffer, entry, ts); \
\
/* \
* Make sure to update slot value before indicating \
* that the slot is available for consumption. \
*/ \
ck_pr_fence_store(); \
ck_pr_store_uint(&ring->p_tail, delta); \
return true; \
} while (0)
/*
* This variant of ck_ring_enqueue_spsc returns the snapshot of queue length
* with respect to the linearization point. This can be used to extract ring
* size without incurring additional cacheline invalidation overhead from the
* writer.
*/
CK_CC_INLINE static bool
_ck_ring_enqueue_spsc_size(struct ck_ring *ring,
void *restrict buffer,
const void *restrict entry,
unsigned int ts,
unsigned int *size)
CK_CC_INLINE static void
ck_ring_init(struct ck_ring *ring, unsigned int size)
{
CK_RING_ENQUEUE_SP_DEFINE(ring, buffer, entry, ts,
size, true);
ring->size = size;
ring->mask = size - 1;
ring->p_tail = 0;
ring->p_head = 0;
ring->c_head = 0;
return;
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of a concurrent invocation
* of ck_ring_dequeue_spsc.
* The _ck_ring_* namespace is internal only and must not used externally.
*/
CK_CC_INLINE static bool
_ck_ring_enqueue_spsc(struct ck_ring *ring,
void *restrict destination,
const void *restrict source,
unsigned int ts)
CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp(struct ck_ring *ring,
void *CK_CC_RESTRICT buffer,
const void *CK_CC_RESTRICT entry,
unsigned int ts,
unsigned int *size)
{
const unsigned int mask = ring->mask;
unsigned int consumer, producer, delta;
CK_RING_ENQUEUE_SP_DEFINE(ring, destination, source,
ts, NULL, false);
}
consumer = ck_pr_load_uint(&ring->c_head);
producer = ring->p_tail;
delta = producer + 1;
if (size != NULL)
*size = (producer - consumer) & mask;
#undef CK_RING_ENQUEUE_SP_DEFINE
if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
return false;
CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry,
unsigned int *size)
{
buffer = (char *)buffer + ts * (producer & mask);
memcpy(buffer, entry, ts);
return _ck_ring_enqueue_spsc_size(ring, buffer, &entry,
sizeof(entry), size);
/*
* Make sure to update slot value before indicating
* that the slot is available for consumption.
*/
ck_pr_fence_store();
ck_pr_store_uint(&ring->p_tail, delta);
return true;
}
CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp_size(struct ck_ring *ring,
void *CK_CC_RESTRICT buffer,
const void *CK_CC_RESTRICT entry,
unsigned int ts,
unsigned int *size)
{
unsigned int sz;
bool r;
return _ck_ring_enqueue_spsc(ring, buffer,
&entry, sizeof(entry));
r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
*size = sz;
return r;
}
/*
* Single consumer and single producer ring buffer dequeue (consumer).
*/
CK_CC_INLINE static bool
_ck_ring_dequeue_spsc(struct ck_ring *ring,
const void *restrict buffer,
void *restrict target,
CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_sc(struct ck_ring *ring,
const void *CK_CC_RESTRICT buffer,
void *CK_CC_RESTRICT target,
unsigned int size)
{
const unsigned int mask = ring->mask;
unsigned int consumer, producer;
unsigned int mask = ring->mask;
consumer = ring->c_head;
producer = ck_pr_load_uint(&ring->p_tail);
if (consumer == producer)
if (CK_CC_UNLIKELY(consumer == producer))
return false;
/*
@ -188,67 +163,90 @@ _ck_ring_dequeue_spsc(struct ck_ring *ring,
return true;
}
CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_dequeue_spsc(ring, buffer,
data, sizeof(data));
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of up to UINT_MAX concurrent
* invocations of ck_ring_dequeue_spmc.
*
* This variant of ck_ring_enqueue_spmc returns the snapshot of queue length
* with respect to the linearization point. This can be used to extract ring
* size without incurring additional cacheline invalidation overhead from the
* writer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp(struct ck_ring *ring,
void *buffer,
const void *entry,
unsigned int ts,
unsigned int *size)
{
const unsigned int mask = ring->mask;
unsigned int producer, consumer, delta;
bool r = true;
producer = ck_pr_load_uint(&ring->p_head);
return ck_ring_enqueue_spsc_size(ring, buffer,
entry, size);
do {
/*
* The snapshot of producer must be up to date with
* respect to consumer.
*/
ck_pr_fence_load();
consumer = ck_pr_load_uint(&ring->c_head);
delta = producer + 1;
if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) {
r = false;
goto leave;
}
} while (ck_pr_cas_uint_value(&ring->p_head,
producer,
delta,
&producer) == false);
buffer = (char *)buffer + ts * (producer & mask);
memcpy(buffer, entry, ts);
/*
* Wait until all concurrent producers have completed writing
* their data into the ring buffer.
*/
while (ck_pr_load_uint(&ring->p_tail) != producer)
ck_pr_stall();
/*
* Ensure that copy is completed before updating shared producer
* counter.
*/
ck_pr_fence_store();
ck_pr_store_uint(&ring->p_tail, delta);
leave:
if (size != NULL)
*size = (producer - consumer) & mask;
return r;
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of up to UINT_MAX concurrent
* invocations of ck_ring_dequeue_spmc.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp_size(struct ck_ring *ring,
void *buffer,
const void *entry,
unsigned int ts,
unsigned int *size)
{
unsigned int sz;
bool r;
return ck_ring_enqueue_spsc(ring, buffer, entry);
r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
*size = sz;
return r;
}
CK_CC_INLINE static bool
_ck_ring_trydequeue_spmc(struct ck_ring *ring,
CK_CC_FORCE_INLINE static bool
_ck_ring_trydequeue_mc(struct ck_ring *ring,
const void *buffer,
void *data,
unsigned int size)
{
const unsigned int mask = ring->mask;
unsigned int consumer, producer;
unsigned int mask = ring->mask;
consumer = ck_pr_load_uint(&ring->c_head);
ck_pr_fence_load();
producer = ck_pr_load_uint(&ring->p_tail);
if (consumer == producer)
if (CK_CC_UNLIKELY(consumer == producer))
return false;
ck_pr_fence_load();
@ -260,18 +258,8 @@ _ck_ring_trydequeue_spmc(struct ck_ring *ring,
return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
}
CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_trydequeue_spmc(ring,
buffer, data, sizeof(data));
}
CK_CC_INLINE static bool
_ck_ring_dequeue_spmc(struct ck_ring *ring,
CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_mc(struct ck_ring *ring,
const void *buffer,
void *data,
unsigned int ts)
@ -291,7 +279,7 @@ _ck_ring_dequeue_spmc(struct ck_ring *ring,
ck_pr_fence_load();
producer = ck_pr_load_uint(&ring->p_tail);
if (consumer == producer)
if (CK_CC_UNLIKELY(consumer == producer))
return false;
ck_pr_fence_load();
@ -309,168 +297,166 @@ _ck_ring_dequeue_spmc(struct ck_ring *ring,
return true;
}
/*
* The ck_ring_*_spsc namespace is the public interface for interacting with a
* ring buffer containing pointers. Correctness is only provided if there is up
* to one concurrent consumer and up to one concurrent producer.
*/
CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry,
unsigned int *size)
{
return _ck_ring_dequeue_spmc(ring, buffer, data,
sizeof(*buffer));
return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
sizeof(entry), size);
}
#define CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, size, P) do {\
const unsigned int mask = ring->mask; \
unsigned int producer, consumer, delta; \
bool r = true; \
\
producer = ck_pr_load_uint(&ring->p_head); \
\
do { \
/* \
* The snapshot of producer must be up to date with \
* respect to consumer. \
*/ \
ck_pr_fence_load(); \
consumer = ck_pr_load_uint(&ring->c_head); \
\
delta = producer + 1; \
if ((delta & mask) == (consumer & mask)) { \
r = false; \
goto leave; \
} \
} while (ck_pr_cas_uint_value(&ring->p_head, \
producer, \
delta, \
&producer) == false); \
\
buffer = (char *)buffer + ts * (producer & mask); \
memcpy(buffer, entry, ts); \
\
/* \
* Wait until all concurrent producers have completed writing \
* their data into the ring buffer. \
*/ \
while (ck_pr_load_uint(&ring->p_tail) != producer) \
ck_pr_stall(); \
\
/* \
* Ensure that copy is completed before updating shared producer\
* counter. \
*/ \
ck_pr_fence_store(); \
ck_pr_store_uint(&ring->p_tail, delta); \
\
leave: \
if (P) \
*(unsigned int *)size = (producer - consumer) & mask; \
\
return r; \
} while (0)
CK_CC_INLINE static bool
_ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
void *buffer,
const void *entry,
unsigned int ts,
unsigned int *size)
ck_ring_enqueue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
{
CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, size, true);
return _ck_ring_enqueue_sp(ring, buffer,
&entry, sizeof(entry), NULL);
}
CK_CC_INLINE static bool
_ck_ring_enqueue_mpsc(struct ck_ring *ring,
void *buffer,
const void *entry,
unsigned int ts)
ck_ring_dequeue_spsc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, NULL, false);
return _ck_ring_dequeue_sc(ring, buffer,
data, sizeof(data));
}
#undef CK_RING_ENQUEUE_MP_DEFINE
/*
* The ck_ring_*_mpmc namespace is the public interface for interacting with a
* ring buffer containing pointers. Correctness is provided for any number of
* producers and consumers.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_mpsc(struct ck_ring *ring,
ck_ring_enqueue_mpmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
{
return _ck_ring_enqueue_mpsc(ring, buffer, &entry, sizeof(entry));
return _ck_ring_enqueue_mp(ring, buffer, &entry,
sizeof(entry), NULL);
}
CK_CC_INLINE static bool
ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry,
unsigned int *size)
{
return _ck_ring_enqueue_mpsc_size(ring, buffer, &entry,
return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
sizeof(entry), size);
}
CK_CC_INLINE static bool
ck_ring_dequeue_mpsc(struct ck_ring *ring,
ck_ring_trydequeue_mpmc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_dequeue_spsc(ring, buffer, data,
sizeof(data));
return _ck_ring_trydequeue_mc(ring,
buffer, data, sizeof(data));
}
CK_CC_INLINE static bool
ck_ring_enqueue_mpmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
ck_ring_dequeue_mpmc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_enqueue_mpsc(ring, buffer, &entry, sizeof(entry));
return _ck_ring_dequeue_mc(ring, buffer, data,
sizeof(data));
}
/*
* The ck_ring_*_spmc namespace is the public interface for interacting with a
* ring buffer containing pointers. Correctness is provided for any number of
* consumers with up to one concurrent producer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
ck_ring_enqueue_spmc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry,
unsigned int *size)
{
return _ck_ring_enqueue_mpsc_size(ring, buffer, &entry,
return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
sizeof(entry), size);
}
CK_CC_INLINE static bool
ck_ring_trydequeue_mpmc(struct ck_ring *ring,
ck_ring_enqueue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
{
return _ck_ring_enqueue_sp(ring, buffer, &entry,
sizeof(entry), NULL);
}
CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_trydequeue_spmc(ring,
buffer, data, sizeof(data));
return _ck_ring_trydequeue_mc(ring, buffer, data, sizeof(data));
}
CK_CC_INLINE static bool
ck_ring_dequeue_mpmc(struct ck_ring *ring,
ck_ring_dequeue_spmc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_dequeue_spmc(ring, buffer, data,
sizeof(data));
return _ck_ring_dequeue_mc(ring, buffer, data, sizeof(data));
}
CK_CC_INLINE static void
ck_ring_init(struct ck_ring *ring, unsigned int size)
/*
* The ck_ring_*_mpsc namespace is the public interface for interacting with a
* ring buffer containing pointers. Correctness is provided for any number of
* producers with up to one concurrent consumers.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_mpsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry)
{
ring->size = size;
ring->mask = size - 1;
ring->p_tail = 0;
ring->p_head = 0;
ring->c_head = 0;
return;
return _ck_ring_enqueue_mp(ring, buffer, entry,
sizeof(entry), NULL);
}
CK_CC_INLINE static bool
ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
const void *entry,
unsigned int *size)
{
return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
sizeof(entry), size);
}
CK_CC_INLINE static bool
ck_ring_dequeue_mpsc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
void *data)
{
return _ck_ring_dequeue_sc(ring, buffer, data,
sizeof(data));
}
#define CK_RING_PROTOTYPE(name, type) \
@ -481,7 +467,7 @@ ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \
unsigned int *d) \
{ \
\
return _ck_ring_enqueue_spsc_size(a, b, c, \
return _ck_ring_enqueue_sp_size(a, b, c, \
sizeof(struct type), d); \
} \
\
@ -491,8 +477,8 @@ ck_ring_enqueue_spsc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_enqueue_spsc(a, b, c, \
sizeof(struct type)); \
return _ck_ring_enqueue_sp(a, b, c, \
sizeof(struct type), NULL); \
} \
\
CK_CC_INLINE static bool \
@ -501,7 +487,7 @@ ck_ring_dequeue_spsc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_dequeue_spsc(a, b, c, \
return _ck_ring_dequeue_sc(a, b, c, \
sizeof(struct type)); \
} \
\
@ -512,7 +498,7 @@ ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \
unsigned int *d) \
{ \
\
return _ck_ring_enqueue_spsc_size(a, b, c, \
return _ck_ring_enqueue_sp_size(a, b, c, \
sizeof(struct type), d); \
} \
\
@ -522,8 +508,8 @@ ck_ring_enqueue_spmc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_enqueue_spsc(a, b, c, \
sizeof(struct type)); \
return _ck_ring_enqueue_sp(a, b, c, \
sizeof(struct type), NULL); \
} \
\
CK_CC_INLINE static bool \
@ -532,7 +518,7 @@ ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_trydequeue_spmc(a, \
return _ck_ring_trydequeue_mc(a, \
b, c, sizeof(struct type)); \
} \
\
@ -542,7 +528,7 @@ ck_ring_dequeue_spmc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_dequeue_spmc(a, b, c, \
return _ck_ring_dequeue_mc(a, b, c, \
sizeof(struct type)); \
} \
\
@ -552,8 +538,8 @@ ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_enqueue_mpsc(a, b, c, \
sizeof(struct type)); \
return _ck_ring_enqueue_mp(a, b, c, \
sizeof(struct type), NULL); \
} \
\
CK_CC_INLINE static bool \
@ -563,7 +549,7 @@ ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \
unsigned int *d) \
{ \
\
return _ck_ring_enqueue_mpsc_size(a, b, c, \
return _ck_ring_enqueue_mp_size(a, b, c, \
sizeof(struct type), d); \
} \
\
@ -573,7 +559,7 @@ ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_dequeue_spsc(a, b, c, \
return _ck_ring_dequeue_sc(a, b, c, \
sizeof(struct type)); \
} \
\
@ -584,7 +570,7 @@ ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \
unsigned int *d) \
{ \
\
return _ck_ring_enqueue_mpsc_size(a, b, c, \
return _ck_ring_enqueue_mp_size(a, b, c, \
sizeof(struct type), d); \
} \
\
@ -594,8 +580,8 @@ ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_enqueue_mpsc(a, b, c, \
sizeof(struct type)); \
return _ck_ring_enqueue_mp(a, b, c, \
sizeof(struct type), NULL); \
} \
\
CK_CC_INLINE static bool \
@ -604,7 +590,7 @@ ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_trydequeue_spmc(a, \
return _ck_ring_trydequeue_mc(a, \
b, c, sizeof(struct type)); \
} \
\
@ -614,7 +600,7 @@ ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \
struct type *c) \
{ \
\
return _ck_ring_dequeue_spmc(a, b, c, \
return _ck_ring_dequeue_mc(a, b, c, \
sizeof(struct type)); \
}

Loading…
Cancel
Save