diff --git a/include/ck_ring.h b/include/ck_ring.h index e5f0712..6e0202c 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -84,6 +84,45 @@ ck_ring_init(struct ck_ring *ring, unsigned int size) /* * The _ck_ring_* namespace is internal only and must not used externally. */ + +/* + * This function will return a region of memory to write for the next value + * for a single producer. + */ +CK_CC_FORCE_INLINE static void * +_ck_ring_enqueue_reserve_sp(struct ck_ring *ring, + void *CK_CC_RESTRICT buffer, + unsigned int ts, + unsigned int *size) +{ + const unsigned int mask = ring->mask; + unsigned int consumer, producer, delta; + + consumer = ck_pr_load_uint(&ring->c_head); + producer = ring->p_tail; + delta = producer + 1; + if (size != NULL) + *size = (producer - consumer) & mask; + + if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask))) + return NULL; + + return (char *)buffer + ts * (producer & mask); +} + +/* + * This is to be called to commit and make visible a region of previously + * reserved with reverse_sp. + */ +CK_CC_FORCE_INLINE static void +_ck_ring_enqueue_commit_sp(struct ck_ring *ring) +{ + + ck_pr_fence_store(); + ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1); + return; +} + CK_CC_FORCE_INLINE static bool _ck_ring_enqueue_sp(struct ck_ring *ring, void *CK_CC_RESTRICT buffer, @@ -163,6 +202,65 @@ _ck_ring_dequeue_sc(struct ck_ring *ring, return true; } +CK_CC_FORCE_INLINE static void * +_ck_ring_enqueue_reserve_mp(struct ck_ring *ring, + void *buffer, + unsigned int ts, + unsigned int *ticket, + unsigned int *size) +{ + const unsigned int mask = ring->mask; + unsigned int producer, consumer, delta; + + producer = ck_pr_load_uint(&ring->p_head); + + for (;;) { + ck_pr_fence_load(); + consumer = ck_pr_load_uint(&ring->c_head); + + delta = producer + 1; + + if (CK_CC_LIKELY((producer - consumer) < mask)) { + if (ck_pr_cas_uint_value(&ring->p_head, + producer, delta, &producer) == true) { + break; + } + } else { + unsigned int new_producer; + + ck_pr_fence_load(); + new_producer = ck_pr_load_uint(&ring->p_head); + + if (producer == new_producer) { + if (size != NULL) + *size = (producer - consumer) & mask; + + return false; + } + + producer = new_producer; + } + } + + *ticket = producer; + if (size != NULL) + *size = (producer - consumer) & mask; + + return (char *)buffer + ts * (producer & mask); +} + +CK_CC_FORCE_INLINE static void +_ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer) +{ + + while (ck_pr_load_uint(&ring->p_tail) != producer) + ck_pr_stall(); + + ck_pr_fence_store(); + ck_pr_store_uint(&ring->p_tail, producer + 1); + return; +} + CK_CC_FORCE_INLINE static bool _ck_ring_enqueue_mp(struct ck_ring *ring, void *buffer, @@ -354,6 +452,33 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, &entry, sizeof(entry), NULL); } +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + unsigned int *size) +{ + + return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), + size); +} + +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_spsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer) +{ + + return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), + NULL); +} + +CK_CC_INLINE static void +ck_ring_enqueue_commit_spsc(struct ck_ring *ring) +{ + + _ck_ring_enqueue_commit_sp(ring); + return; +} + CK_CC_INLINE static bool ck_ring_dequeue_spsc(struct ck_ring *ring, const struct ck_ring_buffer *buffer, @@ -375,8 +500,7 @@ ck_ring_enqueue_mpmc(struct ck_ring *ring, const void *entry) { - return _ck_ring_enqueue_mp(ring, buffer, &entry, - sizeof(entry), NULL); + return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL); } CK_CC_INLINE static bool @@ -386,8 +510,37 @@ ck_ring_enqueue_mpmc_size(struct ck_ring *ring, unsigned int *size) { - return _ck_ring_enqueue_mp_size(ring, buffer, &entry, - sizeof(entry), size); + return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry), + size); +} + +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + unsigned int *ticket) +{ + + return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), + ticket, NULL); +} + +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + unsigned int *ticket, + unsigned int *size) +{ + + return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), + ticket, size); +} + +CK_CC_INLINE static void +ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket) +{ + + _ck_ring_enqueue_commit_mp(ring, ticket); + return; } CK_CC_INLINE static bool @@ -415,6 +568,31 @@ ck_ring_dequeue_mpmc(struct ck_ring *ring, * ring buffer containing pointers. Correctness is provided for any number of * consumers with up to one concurrent producer. */ +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + unsigned int *size) +{ + + return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size); +} + +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_spmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer) +{ + + return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL); +} + +CK_CC_INLINE static void +ck_ring_enqueue_commit_spmc(struct ck_ring *ring) +{ + + _ck_ring_enqueue_commit_sp(ring); + return; +} + CK_CC_INLINE static bool ck_ring_enqueue_spmc_size(struct ck_ring *ring, struct ck_ring_buffer *buffer, @@ -459,6 +637,35 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, * ring buffer containing pointers. Correctness is provided for any number of * producers with up to one concurrent consumers. */ +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + unsigned int *ticket) +{ + + return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), + ticket, NULL); +} + +CK_CC_INLINE static void * +ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + unsigned int *ticket, + unsigned int *size) +{ + + return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *), + ticket, size); +} + +CK_CC_INLINE static void +ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket) +{ + + _ck_ring_enqueue_commit_mp(ring, ticket); + return; +} + CK_CC_INLINE static bool ck_ring_enqueue_mpsc(struct ck_ring *ring, struct ck_ring_buffer *buffer, @@ -494,194 +701,290 @@ ck_ring_dequeue_mpsc(struct ck_ring *ring, * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining * values of a particular type in the ring the buffer. */ -#define CK_RING_PROTOTYPE(name, type) \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c, \ - unsigned int *d) \ -{ \ - \ - return _ck_ring_enqueue_sp_size(a, b, c, \ - sizeof(struct type), d); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_spsc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_enqueue_sp(a, b, c, \ - sizeof(struct type), NULL); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_dequeue_spsc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_dequeue_sc(a, b, c, \ - sizeof(struct type)); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c, \ - unsigned int *d) \ -{ \ - \ - return _ck_ring_enqueue_sp_size(a, b, c, \ - sizeof(struct type), d); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_enqueue_sp(a, b, c, \ - sizeof(struct type), NULL); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_trydequeue_mc(a, \ - b, c, sizeof(struct type)); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_dequeue_mc(a, b, c, \ - sizeof(struct type)); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_enqueue_mp(a, b, c, \ - sizeof(struct type), NULL); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c, \ - unsigned int *d) \ -{ \ - \ - return _ck_ring_enqueue_mp_size(a, b, c, \ - sizeof(struct type), d); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_dequeue_sc(a, b, c, \ - sizeof(struct type)); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c, \ - unsigned int *d) \ -{ \ - \ - return _ck_ring_enqueue_mp_size(a, b, c, \ - sizeof(struct type), d); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_enqueue_mp(a, b, c, \ - sizeof(struct type), NULL); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_trydequeue_mc(a, \ - b, c, sizeof(struct type)); \ -} \ - \ -CK_CC_INLINE static bool \ -ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ - struct type *b, \ - struct type *c) \ -{ \ - \ - return _ck_ring_dequeue_mc(a, b, c, \ - sizeof(struct type)); \ +#define CK_RING_PROTOTYPE(name, type) \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a, \ + struct type *b) \ +{ \ + \ + return _ck_ring_enqueue_reserve_sp(a, b, \ + sizeof(struct type), NULL); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a, \ + struct type *b, \ + unsigned int *c) \ +{ \ + \ + return _ck_ring_enqueue_reserve_sp(a, b, \ + sizeof(struct type), c); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_sp_size(a, b, c, \ + sizeof(struct type), d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_spsc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_enqueue_sp(a, b, c, \ + sizeof(struct type), NULL); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_dequeue_spsc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_dequeue_sc(a, b, c, \ + sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a, \ + struct type *b) \ +{ \ + \ + return _ck_ring_enqueue_reserve_sp(a, b, \ + sizeof(struct type), NULL); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a, \ + struct type *b, \ + unsigned int *c) \ +{ \ + \ + return _ck_ring_enqueue_reserve_sp(a, b, \ + sizeof(struct type), c); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_sp_size(a, b, c, \ + sizeof(struct type), d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_enqueue_sp(a, b, c, \ + sizeof(struct type), NULL); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_trydequeue_mc(a, \ + b, c, sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_dequeue_mc(a, b, c, \ + sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a, \ + struct type *b, \ + unsigned int *c) \ +{ \ + \ + return _ck_ring_enqueue_reserve_mp(a, b, \ + sizeof(struct type), c, NULL); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a, \ + struct type *b, \ + unsigned int *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_reserve_mp(a, b, \ + sizeof(struct type), c, d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_enqueue_mp(a, b, c, \ + sizeof(struct type), NULL); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_mp_size(a, b, c, \ + sizeof(struct type), d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_dequeue_sc(a, b, c, \ + sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + unsigned int *c) \ +{ \ + \ + return _ck_ring_enqueue_reserve_mp(a, b, \ + sizeof(struct type), c, NULL); \ +} \ + \ +CK_CC_INLINE static struct type * \ +ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a, \ + struct type *b, \ + unsigned int *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_reserve_mp(a, b, \ + sizeof(struct type), c, d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_mp_size(a, b, c, \ + sizeof(struct type), d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_enqueue_mp(a, b, c, \ + sizeof(struct type), NULL); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_trydequeue_mc(a, \ + b, c, sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_dequeue_mc(a, b, c, \ + sizeof(struct type)); \ } /* * A single producer with one concurrent consumer. */ -#define CK_RING_ENQUEUE_SPSC(name, a, b, c) \ +#define CK_RING_ENQUEUE_SPSC(name, a, b, c) \ ck_ring_enqueue_spsc_##name(a, b, c) -#define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \ +#define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \ ck_ring_enqueue_spsc_size_##name(a, b, c, d) -#define CK_RING_DEQUEUE_SPSC(name, a, b, c) \ +#define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c) \ + ck_ring_enqueue_reserve_spsc_##name(a, b, c) +#define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d) \ + ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d) +#define CK_RING_DEQUEUE_SPSC(name, a, b, c) \ ck_ring_dequeue_spsc_##name(a, b, c) /* * A single producer with any number of concurrent consumers. */ -#define CK_RING_ENQUEUE_SPMC(name, a, b, c) \ +#define CK_RING_ENQUEUE_SPMC(name, a, b, c) \ ck_ring_enqueue_spmc_##name(a, b, c) -#define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \ +#define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \ ck_ring_enqueue_spmc_size_##name(a, b, c, d) -#define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \ +#define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c) \ + ck_ring_enqueue_reserve_spmc_##name(a, b, c) +#define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d) \ + ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d) +#define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \ ck_ring_trydequeue_spmc_##name(a, b, c) -#define CK_RING_DEQUEUE_SPMC(name, a, b, c) \ +#define CK_RING_DEQUEUE_SPMC(name, a, b, c) \ ck_ring_dequeue_spmc_##name(a, b, c) /* * Any number of concurrent producers with up to one * concurrent consumer. */ -#define CK_RING_ENQUEUE_MPSC(name, a, b, c) \ +#define CK_RING_ENQUEUE_MPSC(name, a, b, c) \ ck_ring_enqueue_mpsc_##name(a, b, c) -#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \ +#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \ ck_ring_enqueue_mpsc_size_##name(a, b, c, d) -#define CK_RING_DEQUEUE_MPSC(name, a, b, c) \ +#define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c) \ + ck_ring_enqueue_reserve_mpsc_##name(a, b, c) +#define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d) \ + ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d) +#define CK_RING_DEQUEUE_MPSC(name, a, b, c) \ ck_ring_dequeue_mpsc_##name(a, b, c) /* * Any number of concurrent producers and consumers. */ -#define CK_RING_ENQUEUE_MPMC(name, a, b, c) \ +#define CK_RING_ENQUEUE_MPMC(name, a, b, c) \ ck_ring_enqueue_mpmc_##name(a, b, c) -#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \ +#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \ ck_ring_enqueue_mpmc_size_##name(a, b, c, d) -#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \ +#define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c) \ + ck_ring_enqueue_reserve_mpmc_##name(a, b, c) +#define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d) \ + ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d) +#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \ ck_ring_trydequeue_mpmc_##name(a, b, c) -#define CK_RING_DEQUEUE_MPMC(name, a, b, c) \ +#define CK_RING_DEQUEUE_MPMC(name, a, b, c) \ ck_ring_dequeue_mpmc_##name(a, b, c) #endif /* CK_RING_H */