ck_ring: Prefer treatment of ck_ring_buffer_t as an opaque type.

ck_pring
Samy Al Bahra 12 years ago
parent b6f085a62e
commit b6a1914085

@ -47,6 +47,11 @@ struct ck_ring {
}; };
typedef struct ck_ring ck_ring_t; typedef struct ck_ring ck_ring_t;
struct ck_ring_buffer {
void *value;
};
typedef struct ck_ring_buffer ck_ring_buffer_t;
CK_CC_INLINE static unsigned int CK_CC_INLINE static unsigned int
ck_ring_size(struct ck_ring *ring) ck_ring_size(struct ck_ring *ring)
{ {
@ -64,11 +69,6 @@ ck_ring_capacity(struct ck_ring *ring)
return ring->size; return ring->size;
} }
struct ck_ring_buffer {
void *ring;
};
typedef struct ck_ring_buffer ck_ring_buffer_t;
/* /*
* Atomically enqueues the specified entry. Returns true on success, returns * Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active * false if the ck_ring is full. This operation only support one active
@ -81,13 +81,13 @@ typedef struct ck_ring_buffer ck_ring_buffer_t;
* writer. * writer.
*/ */
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf, ck_ring_enqueue_spsc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry, void *entry,
unsigned int *size) unsigned int *size)
{ {
unsigned int consumer, producer, delta; unsigned int consumer, producer, delta;
unsigned int mask = ring->mask; unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head); consumer = ck_pr_load_uint(&ring->c_head);
producer = ring->p_tail; producer = ring->p_tail;
@ -97,7 +97,7 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf,
if ((delta & mask) == (consumer & mask)) if ((delta & mask) == (consumer & mask))
return false; return false;
ring_buf[producer & mask] = entry; buffer[producer & mask].value = entry;
/* /*
* Make sure to update slot value before indicating * Make sure to update slot value before indicating
@ -115,11 +115,12 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf,
* of ck_ring_dequeue_spsc. * of ck_ring_dequeue_spsc.
*/ */
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) ck_ring_enqueue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry)
{ {
unsigned int consumer, producer, delta; unsigned int consumer, producer, delta;
unsigned int mask = ring->mask; unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head); consumer = ck_pr_load_uint(&ring->c_head);
producer = ring->p_tail; producer = ring->p_tail;
@ -128,7 +129,7 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry)
if ((delta & mask) == (consumer & mask)) if ((delta & mask) == (consumer & mask))
return false; return false;
ring_buf[producer & mask] = entry; buffer[producer & mask].value = entry;
/* /*
* Make sure to update slot value before indicating * Make sure to update slot value before indicating
@ -143,11 +144,12 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry)
* Single consumer and single producer ring buffer dequeue (consumer). * Single consumer and single producer ring buffer dequeue (consumer).
*/ */
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) ck_ring_dequeue_spsc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
{ {
unsigned int consumer, producer; unsigned int consumer, producer;
unsigned int mask = ring->mask; unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ring->c_head; consumer = ring->c_head;
producer = ck_pr_load_uint(&ring->p_tail); producer = ck_pr_load_uint(&ring->p_tail);
@ -168,7 +170,7 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
* troublesome on platforms where sizeof(void *) * troublesome on platforms where sizeof(void *)
* is not guaranteed to be sizeof(T *). * is not guaranteed to be sizeof(T *).
*/ */
ck_pr_store_ptr(data, ring_buf[consumer & mask]); ck_pr_store_ptr(data, buffer[consumer & mask].value);
ck_pr_fence_store(); ck_pr_fence_store();
ck_pr_store_uint(&ring->c_head, consumer + 1); ck_pr_store_uint(&ring->c_head, consumer + 1);
return true; return true;
@ -186,12 +188,13 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
* writer. * writer.
*/ */
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring *ring, ck_ring_buffer_t buf, ck_ring_enqueue_spmc_size(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry, void *entry,
unsigned int *size) unsigned int *size)
{ {
return ck_ring_enqueue_spsc_size(ring, buf, entry, size); return ck_ring_enqueue_spsc_size(ring, buffer, entry, size);
} }
/* /*
@ -201,18 +204,21 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring, ck_ring_buffer_t buf,
* invocations of ck_ring_dequeue_spmc. * invocations of ck_ring_dequeue_spmc.
*/ */
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) ck_ring_enqueue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *entry)
{ {
return ck_ring_enqueue_spsc(ring, buf, entry); return ck_ring_enqueue_spsc(ring, buffer, entry);
} }
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) ck_ring_trydequeue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
{ {
unsigned int consumer, producer; unsigned int consumer, producer;
unsigned int mask = ring->mask; unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head); consumer = ck_pr_load_uint(&ring->c_head);
ck_pr_fence_load(); ck_pr_fence_load();
@ -222,19 +228,20 @@ ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
return false; return false;
ck_pr_fence_load(); ck_pr_fence_load();
ck_pr_store_ptr(data, ring_buf[consumer & mask]); ck_pr_store_ptr(data, buffer[consumer & mask].value);
ck_pr_fence_memory(); ck_pr_fence_memory();
return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
} }
CK_CC_INLINE static bool CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) ck_ring_dequeue_spmc(struct ck_ring *ring,
struct ck_ring_buffer *buffer,
void *data)
{ {
unsigned int consumer, producer; unsigned int consumer, producer;
unsigned int mask = ring->mask; unsigned int mask = ring->mask;
void *r; void *r;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head); consumer = ck_pr_load_uint(&ring->c_head);
@ -258,7 +265,7 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
* volatile load to force volatile semantics while allowing * volatile load to force volatile semantics while allowing
* for r itself to remain aliased across the loop. * for r itself to remain aliased across the loop.
*/ */
r = ck_pr_load_ptr(&ring_buf[consumer & mask]); r = ck_pr_load_ptr(&buffer[consumer & mask].value);
/* Serialize load with respect to head update. */ /* Serialize load with respect to head update. */
ck_pr_fence_memory(); ck_pr_fence_memory();

Loading…
Cancel
Save