ck_ring: Add *_size_* ck_ring enqueue operations.

These variants of ck_ring_enqueue_* return the snapshot of queue
length with respect to the linearization point. This can be used to
extract ring size without incurring additional cacheline invalidation
overhead from the writer.
ck_pring
Samy Al Bahra 12 years ago
parent 86884ed574
commit 45f648bd33

@ -75,6 +75,27 @@
return ring->size; \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spsc_size_##name(struct ck_ring_##name *ring, \
struct type *entry, \
unsigned int *size) \
{ \
unsigned int consumer, producer, delta; \
unsigned int mask = ring->mask; \
\
consumer = ck_pr_load_uint(&ring->c_head); \
producer = ring->p_tail; \
delta = producer + 1; \
*size = (producer - consumer) & mask; \
\
if ((delta & mask) == (consumer & mask)) \
return false; \
\
ring->ring[producer & mask] = *entry; \
ck_pr_fence_store(); \
ck_pr_store_uint(&ring->p_tail, delta); \
return true; \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spsc_##name(struct ck_ring_##name *ring, \
struct type *entry) \
{ \
@ -114,6 +135,13 @@
return true; \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spmc_size_##name(struct ck_ring_##name *ring, \
void *entry, unsigned int *size) \
{ \
\
return ck_ring_enqueue_spsc_size_##name(ring, entry, size); \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spmc_##name(struct ck_ring_##name *ring, void *entry) \
{ \
\
@ -217,6 +245,50 @@ ck_ring_capacity(struct ck_ring *ring)
return ring->size;
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of a concurrent invocation
* of ck_ring_dequeue_spsc.
*
* This variant of ck_ring_enqueue_spsc returns the snapshot of queue length
* with respect to the linearization point. This can be used to extract ring
* size without incurring additional cacheline invalidation overhead from the
* writer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
void *entry,
unsigned int *size)
{
unsigned int consumer, producer, delta;
unsigned int mask = ring->mask;
consumer = ck_pr_load_uint(&ring->c_head);
producer = ring->p_tail;
delta = producer + 1;
*size = (producer - consumer) & mask;
if ((delta & mask) == (consumer & mask))
return false;
ring->ring[producer & mask] = entry;
/*
* Make sure to update slot value before indicating
* that the slot is available for consumption.
*/
ck_pr_fence_store();
ck_pr_store_uint(&ring->p_tail, delta);
return true;
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of a concurrent invocation
* of ck_ring_dequeue_spsc.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry)
{
@ -275,6 +347,32 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, void *data)
return true;
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of up to UINT_MAX concurrent
* invocations of ck_ring_dequeue_spmc.
*
* This variant of ck_ring_enqueue_spmc returns the snapshot of queue length
* with respect to the linearization point. This can be used to extract ring
* size without incurring additional cacheline invalidation overhead from the
* writer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring *ring,
void *entry,
unsigned int *size)
{
return ck_ring_enqueue_spsc_size(ring, entry, size);
}
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
* invocation at a time and works in the presence of up to UINT_MAX concurrent
* invocations of ck_ring_dequeue_spmc.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring *ring, void *entry)
{

Loading…
Cancel
Save