From f9eb3fb479886c12708fccf69e01fb98de895e71 Mon Sep 17 00:00:00 2001 From: Paul Khuong Date: Sun, 17 Sep 2017 16:40:13 -0400 Subject: [PATCH] initial commit of ck_pring: non-blocking *P*S circular buffer/disruptor --- include/ck_pring.h | 146 ++++++++++++++++ include/ck_pring/common.h | 138 +++++++++++++++ include/ck_pring/dequeue.h | 332 +++++++++++++++++++++++++++++++++++++ include/ck_pring/enqueue.h | 143 ++++++++++++++++ include/ck_pring/snoop.h | 93 +++++++++++ src/Makefile.in | 11 +- src/ck_pring.c | 109 ++++++++++++ src/ck_pring/dequeue.c | 312 ++++++++++++++++++++++++++++++++++ src/ck_pring/enqueue.c | 314 +++++++++++++++++++++++++++++++++++ src/ck_pring/snoop.c | 134 +++++++++++++++ 10 files changed, 1731 insertions(+), 1 deletion(-) create mode 100644 include/ck_pring.h create mode 100644 include/ck_pring/common.h create mode 100644 include/ck_pring/dequeue.h create mode 100644 include/ck_pring/enqueue.h create mode 100644 include/ck_pring/snoop.h create mode 100644 src/ck_pring.c create mode 100644 src/ck_pring/dequeue.c create mode 100644 src/ck_pring/enqueue.c create mode 100644 src/ck_pring/snoop.c diff --git a/include/ck_pring.h b/include/ck_pring.h new file mode 100644 index 0000000..734b012 --- /dev/null +++ b/include/ck_pring.h @@ -0,0 +1,146 @@ +#ifndef CK_PRING_H +#define CK_PRING_H +#include +#include +#include +#include +#include + +/** + * Lock-free *P*C disruptor for pointer-sized values. + * + * The consumer and producer sides may be independently specialized to + * the single producer/consumer case. + * + * On my 2.2 GHz Haswell laptop, the best-case inverse throughput for: + * - SP is ~4 cycles/enqueue; + * - SC is ~5 cycles/dequeue; + * - MP is ~24 cycles/enqueue; + * - MC is ~27 cycles/dequeue. + * + * Consumption is slightly slower than production, but consumption can + * be batched. + * + * The ring buffer for the pointer-sized values is an array of + * struct ck_pring_elt, pairs of: + * - generation counter; + * - value. + * + * The generation counter prevents ABA: when a producer is slow, it's + * hard to tell if a ring buffer element is *still* empty, or *newly* + * empty. + * + * This ring buffer attempts to minimise the communication (cache + * coherency traffic) between classes of threads (producers and + * consumers). Producers access the `prod` field, read/write to the + * ring buffer, and only read the `cons` field to refresh their cache + * of the consumer(s) cursor. Consumers access the `cons` field, and + * read the ring buffer; they *never* access the `prod` field. + * + * Production thus mostly incurs traffic for communications between + * producers (hard to avoid), any compulsory traffic on the ring + * buffer (if the buffer is large enough, this is only between + * producers). Producers only interact with consumers when their + * snapshot of the consumer cursor hints that the buffer might be + * full. If consumers keep up with producers, this should only happen + * ~once per buffer_size enqueue. + * + * Consumption only incurs traffic for communications between + * consumers (racing on the consumption cursor), and any compulsory + * traffic to read the ring buffer (negligible with a large enough + * buffer). + * + * Producers race along an unbounded sequence of generation counters + * (64 bit pointers) to atomically acquire empty cells and fill them + * with their value. The actual storage for this unbounded sequence + * is the ring buffer: the cell for sequence id x is `x % ring_size`. + * + * The cell is available if its generation value is < the generation + * counter we wish to acquire. The cell must also have been released + * by the consumer(s). The producer struct has a cache of the + * consumer(s)'s cursor; any generation value strictly less than that + * cursor is fair game. Of course, the cache may be stale (but must + * never be ahead of the actual value); when producers notice that the + * cached consumer cursor is too low, they update it before exiting + * with failure (full ring). + * + * The linear search over an unbounded sequence of generation counter + * isn't practical. Producers maintain an approximate cache of the + * next free generation counter: when a producer succeeds at producing + * a value for sequence counter `gen`, it (racily) stores `gen + 1` in + * prod.cursor. Producers thus begin their search at `prod.cursor`. + * That search is bounded: it can't go further than the consumer's + * cursor + ring_size - 1. Producers will only retry if the + * consumer's cursor has moved ahead, in which case the system has + * made global progress. + * + * Concurrent production uses a double-wide CAS (CMPXCHG16B) to + * atomically acquire/update the generation counter and set the value + * (we only need single compare / double-wide set), which trivially + * guarantees visibility (on TSO). For the SP case, we must be + * careful to write the value before publishing it by updating the + * generation. + * + * Consuming is easier. The consumer(s) maintain a cursor for the + * next sequence value to consume. The storage for that sequence + * is always at `cursor % ring_size`. They simply have to wait + * until that cell is populated with the correct sequence value, + * and update/race on cons.cursor. + * + * The value in cons.cursor imposes a limit on the largest generation + * that could be produced, so it's safe to: + * 1. read the generation; + * 2. read the value; + * 3. update cons.cursor. + * + * If the generation matches at step 1, it must also match at step 2, + * since it can only increase after step 3. This assumes fences + * between each step... fences that are happily no-ops on TSO. + */ + +/** + * Size in bytes for a pring with n_consumer consumers. + */ +size_t +ck_pring_allocation_size(size_t n_consumer); + +/** + * Initialize a pring of n_consumer + */ +void +ck_pring_init(struct ck_pring *, size_t n_consumer, struct ck_pring_elt *, size_t); + +/** + * Allocate a pring for n consumers, with buf of bufsz elements. + * + * If consumers have dependencies, the caller is expected to set up + * the dependency begin/end half-open ranges: a consumer block with + * dependency [begin, end) will only process items after they have + * been consumed by consumer ids [begin, end). + */ +struct ck_pring * +ck_pring_create(struct ck_malloc *, + size_t n_consumer, struct ck_pring_elt *buf, size_t bufsz); + +/** + * Deallocate the pring *but not the buffer*. + */ +void +ck_pring_destroy(struct ck_malloc *, struct ck_pring *); + +static inline size_t +ck_pring_size(const struct ck_pring *); + +struct ck_pring_elt * +ck_pring_buffer(const struct ck_pring *); + +/** + * Inline implementation. + */ +static inline size_t +ck_pring_size(const struct ck_pring *ring) +{ + + return 1 + ring->prod.mask; +} +#endif /* !CK_PRING_H */ diff --git a/include/ck_pring/common.h b/include/ck_pring/common.h new file mode 100644 index 0000000..5c9f625 --- /dev/null +++ b/include/ck_pring/common.h @@ -0,0 +1,138 @@ +#ifndef PRING_COMMON_H +#define PRING_COMMON_H +#include +#include +#include +#include +#include +#include + +struct ck_pring_elt { + char *gen; + uintptr_t value; +} CK_CC_ALIGN(2 * sizeof(void *)); + +/** + * State for producers. + * + * consumer_snap is a lower bound on the oldest consumer cursor. + * We must not write past consumer_snap + mask. It is monotonic + * in the SP case, best effort otherwise. + * + * cursor is a lower bound on the next empty element. It is exact + * (and thus monotonic) in the single-producer case, best effort for + * MP. + */ +struct ck_pring_producer { + uint64_t consumer_snap; /* <= cons.cursor, accessed by prods. */ + uint64_t cursor; /* <= real prod.cursor, accessed by prods & snoops. */ + struct ck_pring_elt *buf; /* write once */ + uint64_t mask; /* write once */ + size_t n_consumer; + uintptr_t dummy; +}; + +/** + * State for consumers. + * + * cursor is exact (and thus monotonic), the index for the next value + * we wish to read. any sequence < cursor is safe to overwrite (which + * lets writers go up to (cursor - 1 + size) = cursor + mask). + * + * read_limit is a lower bound on the exclusive range that is safe to + * read (i.e., we can always read non-empty sequence < read_limit). + * Normal consumers, without consumer dependencies, mostly ignore this + * field (it is set to arbitrarily high values). Consumers with + * dependencies use this field to make sure they never overtake their + * parents: at any time, for any parent, read_limit <= parent.cursor. + * read_limit is monotonic for SC, best effort for MC. + * + * dependency_begin, end mark the half-open range of this consumer's + * parents. + */ +struct ck_pring_consumer { + const struct ck_pring_elt *buf; /* write-once. */ + uint64_t mask; /* write-once. */ + uint64_t cursor; /* written by consumers, read by all. */ + uint64_t read_limit; /* private. cursor < read_limit. */ + uint32_t dependency_begin; /* write once */ + uint32_t dependency_end; /* write once */ +}; + +/** + * A snooper is a consumer hidden from the rest of the system. + * Instead of relying on cursor to protect reads, snoopers must check + * the generation counter on ring buffer elements. + */ +struct ck_pring_snooper { + struct ck_pring_consumer cons; +}; + +struct ck_pring_consumer_block { + struct ck_pring_consumer cons; + char padding[CK_MD_CACHELINE - sizeof(struct ck_pring_consumer)]; +} CK_CC_CACHELINE; + +/** + * Pack more consumer blocks immediately after ck_pring for + * multi-consumer allocations. + */ +struct ck_pring { + struct ck_pring_producer prod; + char padding[CK_MD_CACHELINE - sizeof(struct ck_pring_producer)]; + struct ck_pring_consumer_block cons; +} CK_CC_CACHELINE; + +#define CK_PRING_INIT(BUF) \ + CK_PRING_INIT_((BUF), \ + CK_PRING_PO2_(sizeof(BUF) / sizeof(struct ck_pring_elt)), \ + 1) + +/** + * Inline internals. + */ + +/** + * Return the data block for consumer index (< n_consumer). + */ +static CK_CC_FORCE_INLINE struct ck_pring_consumer * +ck_pring_consumer_by_id(struct ck_pring *ring, size_t index) +{ + struct ck_pring_consumer_block *consumers = &ring->cons; + + /* index should only be > 0 with a heap-allocated tail of consumers. */ + return &consumers[index].cons; +} + +/** + * Update the read limit for the consumer. If there are no + * dependencies, arranges updates to make sure it's called very + * rarely. + * + * Return the new capacity if it is strictly positive, 0 otherwise. + */ +uintptr_t +ck_pring_consumer_update_limit(struct ck_pring_consumer *, + const struct ck_pring *); + +#define CK_PRING_INIT_(BUF, SIZE, N_CONSUMER) \ + { \ + .prod = { \ + .consumer_snap = (SIZE), \ + .cursor = (SIZE), \ + .buf = (BUF), \ + .mask = (SIZE) - 1, \ + .n_consumer = (N_CONSUMER) \ + }, \ + .cons = { \ + .cons = { \ + .buf = (BUF), \ + .mask = (SIZE) - 1, \ + .cursor = (SIZE) \ + } \ + } \ + } + +#define CK_PRING_PO2_(X) \ + ((X) * sizeof(char[2 * !!((X) > 0 && ((X) & ((X) - 1)) == 0) - 1])) +#endif /* !PRING_COMMON_H */ diff --git a/include/ck_pring/dequeue.h b/include/ck_pring/dequeue.h new file mode 100644 index 0000000..5dca476 --- /dev/null +++ b/include/ck_pring/dequeue.h @@ -0,0 +1,332 @@ +#ifndef PRING_DEQUEUE_H +#define PRING_DEQUEUE_H +#include + +/** + * Approximately how many entries are available for consumption. Only + * useful if dependencies are active. + */ +static inline size_t +ck_pring_consume_capacity(struct ck_pring *, size_t index); + +/** + * Dequeue a value from a single-threaded consumer block. + * + * Return 0 if there is no new value in the pring. + */ +static inline uintptr_t +ck_pring_sdequeue(struct ck_pring *, size_t index); + +/** + * Read a value from a single-threaded consumer block. + * + * Return 0 if there is no new value in the pring. + */ +static inline uintptr_t +ck_pring_sread(struct ck_pring *, size_t index); + +/** + * Consume the last value returned by sread for a single-thread consumer block. + */ +static inline void +ck_pring_sconsume(struct ck_pring *, size_t index); + +/** + * Dequeue up to n values from a single-thread consumer block. + * + * Return the number of values read and written to dst. + */ +size_t +ck_pring_sdequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n); + +/** + * Read up to n values from a single-thread consumer block. + * + * Return the number of values read and written to dst. + */ +size_t +ck_pring_sread_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n); + +/** + * Consume the last n values returned by sread_n. + */ +static inline void +ck_pring_sconsume_n(struct ck_pring *, size_t index, size_t n); + +/** + * Dequeue a value from a multi-consumer block. + * + * Return 0 if there is no new value. + */ +static inline uintptr_t +ck_pring_mdequeue(struct ck_pring *, size_t index); + +static inline uintptr_t +ck_pring_mtrydequeue(struct ck_pring *, size_t index); + +static inline uintptr_t +ck_pring_mread(struct ck_pring *, size_t index, uint64_t *OUT_gen); + +static inline uintptr_t +ck_pring_mtryread(struct ck_pring *, size_t index, uint64_t *OUT_gen); + +static inline bool +ck_pring_mconsume(struct ck_pring *, size_t index, uint64_t gen); + +static inline size_t +ck_pring_mdequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n); + +static inline size_t +ck_pring_mtrydequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n); + +static inline size_t +ck_pring_mread_n(struct ck_pring *, size_t index, + uintptr_t *dst, size_t n, uint64_t *OUT_gen); + +static inline size_t +ck_pring_mtryread_n(struct ck_pring *, size_t index, + uintptr_t *dst, size_t n, uint64_t *OUT_gen); + +static inline bool +ck_pring_mconsume_n(struct ck_pring *, size_t index, uint64_t gen, size_t n); + +/** + * Inline implementation. + */ +static inline size_t +ck_pring_consume_capacity(struct ck_pring *ring, size_t index) +{ + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + uint64_t cursor = cons->cursor; + uint64_t limit = cons->read_limit; + + if (CK_CC_UNLIKELY((int64_t)(cursor - limit) >= 0)) { + return ck_pring_consumer_update_limit(cons, ring); + } + + return limit - cursor; +} + +uintptr_t ck_pring_sdequeue_slow(struct ck_pring *, size_t); + +static inline uintptr_t +ck_pring_sdequeue(struct ck_pring *ring, size_t index) +{ + struct ck_pring_elt snap; + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t cursor = cons->cursor; /* only writer is us. */ + size_t loc = cursor & mask; + + if (CK_CC_UNLIKELY((int64_t)(cursor - cons->read_limit) >= 0)) { + /* + * This only happens with dependencies, when consumers + * catch up to the parents. This will always be + * complex, and I don't feel bad about slowing down + * consumers that are going too fast. + */ + return ck_pring_sdequeue_slow(ring, index); + } + + /* + * We know where the next element we wish to consume lives. + * Load its generation *before* its value. If the generation + * matches our cursor, consume the element and return the + * value. + */ + snap.gen = ck_pr_load_ptr(&buf[loc].gen); +#ifdef assert + assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 && + "Concurrent dequeue in sdequeue?"); +#endif + ck_pr_fence_load(); + /* read gen before value. cursor is always an upper bound on gen. */ + snap.value = ck_pr_load_ptr(&buf[loc].value); + if (CK_CC_UNLIKELY((uint64_t)snap.gen != cursor)) { + /* + * This will tend to be either always false (normal + * operations) or always true (queue is empty); a cmov + * would be easy but would be slightly slower than a + * predictable branch in both cases. + */ + return 0; + } + + ck_pr_fence_load_store(); + /* + * Producers will read cons.cursor. Make sure to consume the + * cell's value before releasing, and make sure cursor is safe + * to read by other threads. + */ + ck_pr_store_64(&cons->cursor, cursor + 1); + return snap.value; +} + +uintptr_t ck_pring_sread_slow(struct ck_pring *, size_t); + +static inline uintptr_t +ck_pring_sread(struct ck_pring *ring, size_t index) +{ + struct ck_pring_elt snap; + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t cursor = cons->cursor; + size_t loc = cursor & mask; + + if (CK_CC_UNLIKELY((int64_t)(cursor - cons->read_limit) >= 0)) { + return ck_pring_sread_slow(ring, index); + } + + snap.gen = ck_pr_load_ptr(&buf[loc].gen); +#ifdef assert + assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 && + "Concurrent dequeue in sread?"); +#endif + ck_pr_fence_load(); + snap.value = ck_pr_load_ptr(&buf[loc].value); + return ((uint64_t)snap.gen == cursor) ? snap.value : 0; +} + +static inline void +ck_pring_sconsume(struct ck_pring *ring, size_t index) +{ + + ck_pring_sconsume_n(ring, index, 1); + return; +} + +static inline void +ck_pring_sconsume_n(struct ck_pring *ring, size_t index, size_t n) +{ + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + + ck_pr_fence_load_store(); + ck_pr_store_64(&cons->cursor, ring->cons.cons.cursor + n); + return; +} + +uintptr_t +ck_pring_mdequeue_generic(struct ck_pring *ring, size_t index, bool hard); + +static inline uintptr_t +ck_pring_mdequeue(struct ck_pring *ring, size_t index) +{ + + return ck_pring_mdequeue_generic(ring, index, true); +} + +static inline uintptr_t +ck_pring_mtrydequeue(struct ck_pring *ring, size_t index) +{ + + return ck_pring_mdequeue_generic(ring, index, false); +} + +uintptr_t ck_pring_mread_slow(struct ck_pring *, size_t, uint64_t *, bool); + +static inline uintptr_t +ck_pring_mread_generic(struct ck_pring *ring, size_t index, + uint64_t *OUT_gen, bool hard) +{ + struct ck_pring_elt snap; + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t cursor = ck_pr_load_64(&cons->cursor); + uint64_t read_limit = ck_pr_load_64(&cons->read_limit); + size_t loc = cursor & mask; + + if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) { + goto slow; + } + + snap.gen = ck_pr_load_ptr(&buf[loc].gen); + ck_pr_fence_load(); + snap.value = ck_pr_load_ptr(&buf[loc].value); + if (CK_CC_UNLIKELY((int64_t)((uint64_t)snap.gen - cursor) < 0)) { + return 0; + } + + ck_pr_fence_load(); + if (CK_CC_UNLIKELY((uint64_t)ck_pr_load_ptr(&buf[loc].gen) != cursor)) { + goto slow; + } + + return snap.value; + +slow: + return ck_pring_mread_slow(ring, index, OUT_gen, hard); +} + +static inline uintptr_t +ck_pring_mread(struct ck_pring *ring, size_t index, uint64_t *OUT_gen) +{ + + return ck_pring_mread_generic(ring, index, OUT_gen, true); +} + +static inline uintptr_t +ck_pring_mtryread(struct ck_pring *ring, size_t index, uint64_t *OUT_gen) +{ + + return ck_pring_mread_generic(ring, index, OUT_gen, false); +} + +static inline bool +ck_pring_mconsume(struct ck_pring *ring, size_t index, uint64_t gen) +{ + + return ck_pring_mconsume_n(ring, index, gen, 1); +} + +size_t +ck_pring_mdequeue_n_generic(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n, bool hard); + +inline size_t +ck_pring_mdequeue_n(struct ck_pring *ring, size_t index, uintptr_t *dst, size_t n) +{ + + return ck_pring_mdequeue_n_generic(ring, index, dst, n, true); +} + +inline size_t +ck_pring_mtrydequeue_n(struct ck_pring *ring, size_t index, uintptr_t *dst, size_t n) +{ + + return ck_pring_mdequeue_n_generic(ring, index, dst, n, false); +} + +static inline bool +ck_pring_mconsume_n(struct ck_pring *ring, size_t index, uint64_t gen, size_t n) +{ + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + + ck_pr_fence_load_store(); + return ck_pr_cas_64(&cons->cursor, gen, gen + n); +} + +size_t +ck_pring_mread_n_generic(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n, uint64_t *OUT_gen, bool hard); + +static inline size_t +ck_pring_mread_n(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n, uint64_t *OUT_gen) +{ + + return ck_pring_mread_n_generic(ring, index, + dst, n, OUT_gen, true); +} + +static inline size_t +ck_pring_mtryread_n(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n, uint64_t *OUT_gen) +{ + + return ck_pring_mread_n_generic(ring, index, + dst, n, OUT_gen, false); +} +#endif /* !PRING_DEQUEUE_H */ diff --git a/include/ck_pring/enqueue.h b/include/ck_pring/enqueue.h new file mode 100644 index 0000000..ee33fa7 --- /dev/null +++ b/include/ck_pring/enqueue.h @@ -0,0 +1,143 @@ +#ifndef PRING_ENQUEUE_H +#define PRING_ENQUEUE_H +#include + +/** + * Return an approximation of the remaining capacity in the ring. + * + * Exact for single producer. + */ +static inline size_t +ck_pring_enqueue_capacity(struct ck_pring *); + +/** + * Attempt to enqueue one value in a single-producer pring. + * + * Return true iff success. + */ +static inline bool +ck_pring_senqueue(struct ck_pring *, uintptr_t); + +/** + * Attempt to enqueue one value in a single-producer pring. + * + * Return true iff success, in which case old_value was overwritten + * with the value previously in the ring buffer element. The value + * is unspecified on failure. + */ +static inline bool +ck_pring_senqueue_val(struct ck_pring *, uintptr_t, uintptr_t *old_value); + +/** + * Attempt to enqueue up to n values in a single-producer pring. + * + * Return the number of values enqueued; values[0 .. ret) is updated + * with the value previously in the ring buffer elements. + */ +size_t +ck_pring_senqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n); + +/** + * Attempt to enqueue one value in a multi-producer pring. + * + * Return true iff success. + */ +static inline bool +ck_pring_menqueue(struct ck_pring *, uintptr_t); + +/** + * Attempt to enqueue one value in a multi-producer pring. + * + * Return true iff success, in which case old_value was overwritten + * with the value previously in the ring buffer element. The value + * is unspecified on failure. + */ +bool +ck_pring_menqueue_val(struct ck_pring *, uintptr_t, uintptr_t *old_value); + +/** + * Attempt to enqueue up to n values in a single-producer pring. + * + * Return the number of values enqueued; values[0 .. ret) is updated + * with the value previously in the ring buffer elements. + */ +size_t +ck_pring_menqueue_n(struct ck_pring *, uintptr_t *values, size_t n); + +/** + * Inline implementation. + */ +static inline bool +ck_pring_menqueue(struct ck_pring *ring, uintptr_t value) +{ + uintptr_t buf; + + (void)buf; + return ck_pring_menqueue_val(ring, value, &buf); +} + +size_t +ck_pring_enqueue_capacity_slow(struct ck_pring *ring); + +static inline size_t +ck_pring_enqueue_capacity(struct ck_pring *ring) +{ + uint64_t mask = ring->prod.mask; + uint64_t consumer_snap = ring->prod.consumer_snap; + uint64_t cursor = ring->prod.cursor; + + if (CK_CC_UNLIKELY(cursor - consumer_snap > mask)) { + return ck_pring_enqueue_capacity_slow(ring); + } + + return (consumer_snap + mask + 1) - cursor; +} + +static inline bool +ck_pring_senqueue(struct ck_pring *ring, uintptr_t value) +{ + + return ck_pring_senqueue_val(ring, value, &ring->prod.dummy); +} + +bool +ck_pring_senqueue_val_slow(struct ck_pring *ring, uintptr_t value, + uintptr_t *old_value); + +static inline bool +ck_pring_senqueue_val(struct ck_pring *ring, uintptr_t value, + uintptr_t *old_value) +{ + struct ck_pring_elt *buf = ring->prod.buf; + struct ck_pring_elt *dst; + uint64_t mask = ring->prod.mask; + /* only writer to prod.* is us. */ + uint64_t consumer_snap = ring->prod.consumer_snap; + uint64_t cursor = ring->prod.cursor; + size_t loc = cursor & mask; + + /* + * We know where we want to write. Make sure our snapshot of + * the consumer cursor lets us write there (or update the + * snapshot), and write the value *before* publishing + * the new generation. + */ + dst = &buf[loc]; +#ifdef __GNUC__ + __asm__("" : "+r"(dst)); /* compute dst before the branch. */ +#endif + if (CK_CC_UNLIKELY((cursor - consumer_snap) > mask)) { + return ck_pring_senqueue_val_slow(ring, value, old_value); + } + + /* We're not too far. do the write! */ + *old_value = dst->value; + ck_pr_store_ptr((void **)&dst->value, (void *)value); + ck_pr_fence_store(); + ck_pr_store_ptr(&dst->gen, (void *)cursor); + + ck_pr_fence_store(); + ck_pr_store_64(&ring->prod.cursor, cursor + 1); + return true; +} +#endif /* !PRING_ENQUEUE_H */ diff --git a/include/ck_pring/snoop.h b/include/ck_pring/snoop.h new file mode 100644 index 0000000..ed09a16 --- /dev/null +++ b/include/ck_pring/snoop.h @@ -0,0 +1,93 @@ +#ifndef PRING_SNOOP_H +#define PRING_SNOOP_H +#include + +/** + * Initialize a snooper block. Snoopers are consumers that do not + * block producers. dep_begin, dep_end specifies a range of consumer + * ids not to overtake. + */ +void +ck_pring_snoop_init(struct ck_pring_snooper *, const struct ck_pring *, + uint32_t dep_begin, uint32_t dep_end); + +/** + * Approximately how many entries are available for snooping. Only + * useful if dependencies are active. + */ +static inline size_t +ck_pring_snoop_capacity(struct ck_pring_snooper *, const struct ck_pring *); + +/** + * Snoop the next value from the pring. + * + * Return 0 on failure. + */ +static inline uintptr_t +ck_pring_snoop(struct ck_pring_snooper *, const struct ck_pring *); + +/** + * Snoop up to n values from the pring. + * + * Return the number of values snooped and written to dst. + */ +size_t +ck_pring_snoop_n(struct ck_pring_snooper *, const struct ck_pring *, + uintptr_t *dst, size_t n); + +/** + * Inline implementation. + */ +static inline size_t +ck_pring_snoop_capacity(struct ck_pring_snooper *snoop, + const struct ck_pring *ring) +{ + uint64_t cursor = snoop->cons.cursor; + uint64_t limit = snoop->cons.read_limit; + + if (CK_CC_UNLIKELY((int64_t)(cursor - limit) >= 0)) { + return ck_pring_consumer_update_limit(&snoop->cons, ring); + } + + return limit - cursor; +} + +uintptr_t +ck_pring_snoop_slow(struct ck_pring_snooper *snoop, + const struct ck_pring *ring); + +static inline uintptr_t +ck_pring_snoop(struct ck_pring_snooper *snoop, const struct ck_pring *ring) +{ + struct ck_pring_elt snap; + const struct ck_pring_elt *buf = snoop->cons.buf; + uint64_t mask = snoop->cons.mask; + uint64_t cursor = snoop->cons.cursor; + size_t loc = cursor & mask; + + if (CK_CC_UNLIKELY((int64_t)(cursor - snoop->cons.read_limit) >= 0)) { + goto slow; + } + + snap.gen = ck_pr_load_ptr(&buf[loc].gen); + ck_pr_fence_load(); + /* read gen before value. cursor is an upper bound on gen. */ + snap.value = ck_pr_load_ptr(&buf[loc].value); + if (CK_CC_UNLIKELY((int64_t)((uint64_t)snap.gen - cursor) < 0)) { + /* gen is too old. queue is still empty. */ + return 0; + } + + ck_pr_fence_load(); + if (CK_CC_UNLIKELY((uint64_t)ck_pr_load_ptr(&buf[loc].gen) != cursor)) { + /* gen doesn't match and/or cursor is out of date; try again. */ + goto slow; + } + + snoop->cons.cursor = cursor + 1; + return snap.value; + +slow: + return ck_pring_snoop_slow(snoop, ring); +} +#endif /* !PRING_SNOOP_H */ diff --git a/src/Makefile.in b/src/Makefile.in index 0d84e76..da74ff5 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -16,7 +16,11 @@ OBJECTS=ck_barrier_centralized.o \ ck_hp.o \ ck_hs.o \ ck_rhs.o \ - ck_array.o + ck_array.o \ + ck_pring.o \ + ck_pring_dequeue.o \ + ck_pring_enqueue.o \ + ck_pring_snoop.o all: $(ALL_LIBS) @@ -59,6 +63,11 @@ ck_barrier_tournament.o: $(SDIR)/ck_barrier_tournament.c ck_barrier_mcs.o: $(SDIR)/ck_barrier_mcs.c $(CC) $(CFLAGS) -c -o $(TARGET_DIR)/ck_barrier_mcs.o $(SDIR)/ck_barrier_mcs.c +ck_pring.o: $(SDIR)/ck_pring.c $(INCLUDE_DIR)/ck_pring.h $(INCLUDE_DIR)/ck_pring/common.h + $(CC) $(CFLAGS) -c -o $(TARGET_DIR)/ck_pring.o $(SDIR)/ck_pring.c + +ck_pring_%.o: $(SDIR)/ck_pring/%.c $(INCLUDE_DIR)/ck_pring.h $(INCLUDE_DIR)/ck_pring/common.h $(INCLUDE_DIR)/ck_pring/%.h + $(CC) $(CFLAGS) -c -o $@ $< clean: rm -rf $(TARGET_DIR)/*.dSYM $(TARGET_DIR)/*~ $(TARGET_DIR)/*.o \ $(OBJECTS) $(TARGET_DIR)/libck.a $(TARGET_DIR)/libck.so diff --git a/src/ck_pring.c b/src/ck_pring.c new file mode 100644 index 0000000..3b4c3ad --- /dev/null +++ b/src/ck_pring.c @@ -0,0 +1,109 @@ +#define CK_PRING_IMPL +#include +#include + +#include + +size_t +ck_pring_allocation_size(size_t n_consumer) +{ + size_t consumer_size; + size_t extra = 0; + + if (n_consumer > 0) { + extra = n_consumer - 1; + } + + consumer_size = extra * sizeof(struct ck_pring_consumer_block); + return sizeof(struct ck_pring) + consumer_size; +} + +void +ck_pring_init(struct ck_pring *ring, size_t n_consumer, + struct ck_pring_elt *buf, size_t size) +{ + struct ck_pring_consumer_block *next = (void *)(ring + 1); + + assert(size > 0 && (size & (size - 1)) == 0); + *ring = (struct ck_pring) CK_PRING_INIT_(buf, size, n_consumer); + for (size_t i = 1; i < n_consumer; i++) { + next[i - 1].cons = ring->cons.cons; + } + + ck_pr_fence_store(); + return; +} + +struct ck_pring * +ck_pring_create(struct ck_malloc *ck_malloc, + size_t n_consumer, struct ck_pring_elt *buf, size_t bufsz) +{ + struct ck_pring *ret; + size_t ringsz; + + ringsz = ck_pring_allocation_size(n_consumer); + ret = ck_malloc->malloc(ringsz); + memset(ret, 0, ringsz); + ck_pring_init(ret, n_consumer, buf, bufsz); + return ret; +} + +void +ck_pring_destroy(struct ck_malloc *ck_malloc, struct ck_pring *ring) +{ + size_t ringsz; + + if (ring == NULL) { + return; + } + + ringsz = ck_pring_allocation_size(ring->prod.n_consumer); + ck_malloc->free(ring, ringsz, false); + return; +} + +struct ck_pring_elt * +ck_pring_buffer(const struct ck_pring *ring) +{ + + return ring->prod.buf; +} + +/* Declared in pring_common.h */ +uintptr_t +ck_pring_consumer_update_limit(struct ck_pring_consumer *consumer, + const struct ck_pring *ring) +{ + const struct ck_pring_consumer_block *consumers = &ring->cons; + uint64_t old_limit = ck_pr_load_64(&consumer->read_limit); + uint64_t limit = old_limit + (1UL << 60); + uint64_t capacity; + size_t dep_begin = consumer->dependency_begin; + size_t dep_end = consumer->dependency_end; + + /* Common case: no dependency. */ + if (CK_CC_LIKELY(dep_begin >= dep_end)) { + capacity = consumer->mask + 1; + ck_pr_store_64(&consumer->read_limit, limit); + return capacity; + } + + for (size_t i = dep_end; i --> dep_begin; ) { + const struct ck_pring_consumer *current = &consumers[i].cons; + uint64_t current_cursor; + size_t begin = current->dependency_begin; + size_t skip; + + current_cursor = ck_pr_load_64(¤t->cursor); + skip = (current->dependency_end >= i) ? begin : i; + if ((int64_t)(current_cursor - limit) < 0) { + limit = current_cursor; + } + + i = (skip < i) ? skip : i; + } + + capacity = limit - ck_pr_load_64(&consumer->cursor); + ck_pr_store_64(&consumer->read_limit, limit); + return ((int64_t)capacity > 0) ? capacity : 0; +} diff --git a/src/ck_pring/dequeue.c b/src/ck_pring/dequeue.c new file mode 100644 index 0000000..22eccac --- /dev/null +++ b/src/ck_pring/dequeue.c @@ -0,0 +1,312 @@ +#include +#include + +#include + +uintptr_t +ck_pring_sdequeue_slow(struct ck_pring *ring, size_t index) +{ + uintptr_t buf[1] = { 0 }; + uintptr_t n; + + n = ck_pring_sdequeue_n(ring, index, buf, 1); + return buf[0] & -n; +} + +uintptr_t +ck_pring_sread_slow(struct ck_pring *ring, size_t index) +{ + uintptr_t buf[1] = { 0 }; + uintptr_t n; + + n = ck_pring_sread_n(ring, index, buf, 1); + return buf[0] & -n; +} + +size_t +ck_pring_sdequeue_n(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n) +{ + size_t read; + + read = ck_pring_sread_n(ring, index, dst, n); + ck_pring_sconsume_n(ring, index, read); + return read; +} + +size_t +ck_pring_sread_n(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n) +{ + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t base_cursor = cons->cursor; /* only writer is us. */ + uint64_t read_limit = ck_pr_load_64(&cons->read_limit); + size_t capacity = read_limit - base_cursor; + size_t consumed; + + if (CK_CC_UNLIKELY((int64_t)(base_cursor - read_limit) >= 0)) { + capacity = ck_pring_consumer_update_limit(cons, ring); + if (capacity == 0) { + return capacity; + } + } + + n = (n > capacity) ? capacity : n; + /* + * No check for n == 0. This situation should be rare, and + * the code below correctly handles it. + */ + + /* + * See if we can immediately read n values. We know values + * from base_cursor onward have not been overwritten. We only + * have to check if the last item we wish to read has been + * produced before copying everything. + */ + { + struct ck_pring_elt snap; + uint64_t last_cursor = base_cursor + n - 1; + size_t last_loc = last_cursor & mask; + + snap.gen = ck_pr_load_ptr(&buf[last_loc].gen); + ck_pr_fence_load(); + if (CK_CC_UNLIKELY((uint64_t)snap.gen != last_cursor)) { + goto slow; + } + + for (size_t i = 0; i < n; i++) { + uint64_t cursor = base_cursor + i; + size_t loc = cursor & mask; + + dst[i] = ck_pr_load_ptr(&buf[loc].value); + } + + return n; + } + +slow: + for (consumed = 0; consumed < n; consumed++) { + struct ck_pring_elt snap; + uint64_t cursor = base_cursor + consumed; + size_t loc = cursor & mask; + + snap.gen = ck_pr_load_ptr(&buf[loc].gen); + ck_pr_fence_load(); + snap.value = ck_pr_load_ptr(&buf[loc].value); + if ((uint64_t)snap.gen != cursor) { + assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 && + "Concurrent dequeue in sdequeue?"); + break; + } + + dst[consumed] = snap.value; + } + + return consumed; +} + +/** + * Update snap with the value for cursor in the ring buffer. + * Returns a comparator value for generation ?= cursor. + * 0 means we have the value we're looking for. + * Negative means the value is older than expected (the cell + * is empty). + * Positive means the value is younger than expected (we + * were too slow and lost the race). + */ +static inline int64_t +preacquire(struct ck_pring_elt *snap, + const struct ck_pring_elt *buf, uint64_t mask, uint64_t cursor) +{ + size_t loc = cursor & mask; + + snap->gen = ck_pr_load_ptr(&buf[loc].gen); + ck_pr_fence_load(); + snap->value = ck_pr_load_ptr(&buf[loc].value); + + return (int64_t)((uint64_t)snap->gen - cursor); +} + +uintptr_t +ck_pring_mdequeue_generic(struct ck_pring *ring, size_t index, bool hard) +{ + struct ck_pring_elt snap; + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t cursor = ck_pr_load_64(&cons->cursor); + uint64_t read_limit = ck_pr_load_64(&cons->read_limit); + + if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) { + uintptr_t ret; + + ret = ck_pring_consumer_update_limit(cons, ring); + if (ret == 0) { + return ret; + } + } + + /* Fast path, assuming our cursor is up to date. */ + { + int64_t ret; + + ret = preacquire(&snap, buf, mask, cursor); + if (CK_CC_LIKELY(ret == 0)) { + /* + * The ring buffer element is up to date. + * Attempt to acquire it! + */ + if (CK_CC_LIKELY(ck_pr_cas_64(&cons->cursor, + cursor, cursor + 1))) { + return snap.value; + } + } else if (CK_CC_LIKELY(ret < 0)) { + /* + * The ring buffer element is too old (still + * empty). Fail immediately. + */ + return 0; + } + } + + { + uintptr_t arr[1] = { 0 }; + uintptr_t n; + + n = ck_pring_mdequeue_n_generic(ring, index, arr, 1, hard); + return arr[0] & -n; + } +} + +uintptr_t +ck_pring_mread_slow(struct ck_pring *ring, size_t index, + uint64_t *OUT_gen, bool hard) +{ + uintptr_t buf[1]; + uintptr_t n; + + n = ck_pring_mread_n_generic(ring, index, buf, 1, OUT_gen, hard); + return buf[0] & -n; +} + +size_t +ck_pring_mdequeue_n_generic(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n, bool hard) +{ + + for (;;) { + uint64_t gen; + size_t ret; + + ret = ck_pring_mread_n_generic(ring, index, dst, n, &gen, hard); + if (ret == 0 || ck_pring_mconsume_n(ring, index, gen, ret)) { + return ret; + } + + if (!hard) { + return 0; + } + + n = (n + 1) / 2; + } + + return 0; +} + +size_t +ck_pring_mread_n_generic(struct ck_pring *ring, size_t index, + uintptr_t *dst, size_t n, uint64_t *OUT_gen, bool hard) +{ + struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index); + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t base_cursor; + uint64_t read_limit; + size_t base_loc; + size_t capacity; + size_t consumed; + +retry: + base_cursor = ck_pr_load_64(&cons->cursor); + read_limit = ck_pr_load_64(&cons->read_limit); + base_loc = base_cursor & mask; + capacity = read_limit - base_cursor; + + if (CK_CC_UNLIKELY((int64_t)(base_cursor - read_limit) >= 0)) { + capacity = ck_pring_consumer_update_limit(cons, ring); + if (capacity == 0) { + return capacity; + } + } + + n = (n > capacity) ? capacity : n; + *OUT_gen = base_cursor; + + { + struct ck_pring_elt snap; + uint64_t last_cursor = base_cursor + n - 1; + size_t last_loc = last_cursor & mask; + + snap.gen = ck_pr_load_ptr(&buf[last_loc].gen); + ck_pr_fence_load(); + if (CK_CC_UNLIKELY((uint64_t)snap.gen != last_cursor)) { + goto slow; + } + + for (size_t i = 0; i < n; i++) { + uint64_t cursor = base_cursor + i; + size_t loc = cursor & mask; + + dst[i] = ck_pr_load_ptr(&buf[loc].value); + } + + ck_pr_fence_load(); + if (n <= 1 || + (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen) == base_cursor) { + return n; + } + + if (!hard) { + return 0; + } + + /* We started with snap.gen == last_cursor, so we lost a race. */ + n = 1; + goto retry; + } + +slow: + if (n == 0) { + return 0; + } + + for (consumed = 0; consumed < n; consumed++) { + struct ck_pring_elt snap; + uint64_t cursor = base_cursor + consumed; + size_t loc = cursor & mask; + + snap.gen = ck_pr_load_ptr(&buf[loc].gen); + ck_pr_fence_load(); + snap.value = ck_pr_load_ptr(&buf[loc].value); + if ((uint64_t)snap.gen != cursor) { + break; + } + + dst[consumed] = snap.value; + } + + if (consumed == 0 && hard) { + uint64_t gen; + + gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen); + /* Only retry if we lost the race. */ + if ((int64_t)(gen - base_cursor) > 0) { + n = 1; + goto retry; + } + } + + return consumed; +} diff --git a/src/ck_pring/enqueue.c b/src/ck_pring/enqueue.c new file mode 100644 index 0000000..7aa6b11 --- /dev/null +++ b/src/ck_pring/enqueue.c @@ -0,0 +1,314 @@ +#include +#include + +#include +#include +#include +#include + +static uint64_t +oldest_consumer_snap(const struct ck_pring *ring, uint64_t cursor) +{ + const struct ck_pring_consumer_block *consumers = &ring->cons; + size_t n_consumer = ring->prod.n_consumer; + uint64_t ret = cursor; + + for (size_t i = n_consumer; i --> 0; ) { + const struct ck_pring_consumer *current = &consumers[i].cons; + uint64_t current_cursor = ck_pr_load_64(¤t->cursor); + size_t skip = current->dependency_begin; + + if ((int64_t)(current_cursor - ret) < 0) { + ret = current_cursor; + } + + /* + * Current cursor includes [begin, end). If end >= i, + * we may skip everything down to and including begin: + * all skipped indices are covered by the current + * cursor. + */ + i = (current->dependency_end >= i) ? skip : i; + } + + return ret; +} + +size_t +ck_pring_enqueue_capacity_slow(struct ck_pring *ring) +{ + uint64_t mask = ring->prod.mask; + uint64_t consumer_snap; + uint64_t cursor = ring->prod.cursor; + + consumer_snap = oldest_consumer_snap(ring, cursor); + ck_pr_store_64(&ring->prod.consumer_snap, consumer_snap); + if (cursor - consumer_snap > mask) { + return 0; + } + + return (consumer_snap + mask + 1) - cursor; +} + +bool +ck_pring_senqueue_val_slow(struct ck_pring *ring, uintptr_t value, + uintptr_t *old_value) +{ + uint64_t mask = ring->prod.mask; + uint64_t consumer_snap; + uint64_t cursor = ring->prod.cursor; + + consumer_snap = oldest_consumer_snap(ring, cursor); + ring->prod.consumer_snap = consumer_snap; + if (cursor - consumer_snap > mask) { + return false; + } + + return ck_pring_senqueue_val(ring, value, old_value); +} + +size_t +ck_pring_senqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n) +{ + struct ck_pring_elt *buf = ring->prod.buf; + uint64_t mask = ring->prod.mask; + uint64_t base_cursor = ring->prod.cursor; + size_t capacity; + size_t produced; + + capacity = ck_pring_enqueue_capacity(ring); + if (n > capacity) { + n = capacity; + } + + for (produced = 0; produced < n; produced++) { + struct ck_pring_elt *dst; + uint64_t cursor = base_cursor + produced; + uintptr_t previous; + size_t loc = cursor & mask; + + dst = &buf[loc]; + previous = dst->value; + ck_pr_store_ptr((void **)&dst->value, (void *)values[produced]); + ck_pr_fence_store(); + ck_pr_store_ptr(&dst->gen, (void *)cursor); + + values[produced] = previous; + } + + ck_pr_fence_store(); + ck_pr_store_64(&ring->prod.cursor, base_cursor + produced); + return produced; +} + +/** + * Assuming that cursor is safe to overwrite (i.e., is at most mask + * ahead of the read-side cursor), attempt to overwrite an *older* + * record with our new value. + * + * Ret < 0 means success. + * Ret == 0 means we lost the race to update cursor. + * Ret > 0 means we lost the race by (at least) a full revolution + * around the ring buffer. + */ +static int64_t +try_menqueue_one(struct ck_pring_producer *snap, + uintptr_t value, + uint64_t cursor, + uintptr_t *old) +{ + struct ck_pring_elt expected, update; + struct ck_pring_elt *buf = snap->buf; + uint64_t actual_gen; + uint64_t mask = snap->mask; + uint64_t loc = cursor & mask; + int64_t ret; + + expected.value = ck_pr_load_ptr(&buf[loc].value); + /* no barrier here: the CAS will just fail. */ + expected.gen = ck_pr_load_ptr(&buf[loc].gen); + actual_gen = (uint64_t)expected.gen; + + ret = (int64_t)(actual_gen - cursor); + if (ret >= 0) { + /* we're trying to replace a fresh record. fail. */ + goto late; + } + + update.value = value; + update.gen = (void *)cursor; + if (CK_CC_LIKELY(ck_pr_cas_ptr_2_value(&buf[loc], &expected, &update, + &expected))) { + *old = expected.value; + return ret; + } + + /* + * if we failed, try again. the dwcas gave us a consistent + * read, so no spurious failure here. + */ + actual_gen = (uint64_t)expected.gen; + ret = (int64_t)(actual_gen - cursor); + if (ret >= 0) { + goto late; + } + + if (ck_pr_cas_ptr_2_value(&buf[loc], &expected, &update, + &expected)) { + *old = expected.value; + return ret; + } + + actual_gen = (uint64_t)expected.gen; + ret = 0; + +late: + /* + * If we're late, we know the next "free" generation value is + * at least one more than the one we just observed. + */ + snap->cursor = actual_gen + 1; + return ret; +} + +/** + * Bounded linear search for an empty cell, up to snap->cons.consumer_smap + mask; + * Return true on success, false otherwise. + * Update the snapshot's (write) cursor on failure. + * Update the producer-side cache on success. + */ +static inline bool +try_menqueue(struct ck_pring *ring, + struct ck_pring_producer *snap, + uintptr_t value, + uintptr_t *old) +{ + uint64_t consumer_snap = snap->consumer_snap; + uint64_t cursor = snap->cursor; + uint64_t mask = snap->mask; + + if ((int64_t)(cursor - consumer_snap) < 0) { + cursor = consumer_snap; + } + + for (; (cursor - consumer_snap) <= mask; cursor++) { + int64_t ret; + + ret = try_menqueue_one(snap, value, cursor, old); + if (ret > 0) { + /* + * we're really off. break out of here and + * update our snapshots. try_menqueue_one + * already updated snap->cursor. + */ + return false; + } + + /* + * Success! + */ + if (ret < 0) { + ck_pr_store_64(&ring->prod.cursor, cursor + 1); + ck_pr_store_64(&ring->prod.consumer_snap, + snap->consumer_snap); + return true; + } + } + + snap->cursor = cursor; + return false; +} + +bool +ck_pring_menqueue_val(struct ck_pring *ring, uintptr_t value, + uintptr_t *old) +{ + struct ck_pring_producer snap; + + snap.buf = ring->prod.buf; + snap.mask = ring->prod.mask; + snap.consumer_snap = ck_pr_load_64(&ring->prod.consumer_snap); + snap.cursor = ck_pr_load_64(&ring->prod.cursor); + + /* + * Fast path: snap.cursor isn't too far ahead. Immediately + * try to write there. + * We only access the producers' struct and the buffer. + */ + if (CK_CC_LIKELY((snap.cursor - snap.consumer_snap) <= snap.mask)) { + if (CK_CC_LIKELY( + try_menqueue_one(&snap, value, snap.cursor, old) < 0)) { + /* Success: racily update our local cursor and win. */ + ck_pr_store_64(&ring->prod.cursor, snap.cursor + 1); + return true; + } + } + + /* + * Slow path: update our private snapshot from the producers' + * and consumers' structs, until the consumers' cursor stops + * moving (if that happens, it's really time to fail). + */ + for (;;) { + uint64_t consumer_snap; + uint64_t prod_cursor; + + /* + * Linear search for an empty cell that's OK to + * overwrite. On success, nothing else to do: + * try_menqueue updates the producers' cache. + */ + if (try_menqueue(ring, &snap, value, old)) { + return true; + } + + prod_cursor = ck_pr_load_64(&ring->prod.cursor); + /* + * prod.cursor is a racy hint. Either update + * our cache or move the global cursor ahead. + */ + if ((int64_t)(prod_cursor - snap.cursor) > 0) { + snap.cursor = prod_cursor; + } else { + ck_pr_store_64(&ring->prod.cursor, snap.cursor); + } + + consumer_snap = oldest_consumer_snap(ring, snap.cursor); + /* No progress on the consumer's end. Stop trying.*/ + if (consumer_snap == snap.consumer_snap) { + uint64_t current_snap; + + /* Update the global snap if it's older than ours. */ + current_snap = ck_pr_load_64(&ring->prod.consumer_snap); + if ((int64_t)(consumer_snap - current_snap) < 0) { + ck_pr_store_64(&ring->prod.consumer_snap, + consumer_snap); + } + + break; + } + + snap.consumer_snap = consumer_snap; + } + + return false; +} + +size_t +ck_pring_menqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n) +{ + + /* XXX: do this better. */ + for (size_t i = 0; i < n; i++) { + uintptr_t current = values[i]; + uintptr_t update; + + if (!ck_pring_menqueue_val(ring, current, &update)) { + return i; + } + + values[i] = update; + } + + return n; +} diff --git a/src/ck_pring/snoop.c b/src/ck_pring/snoop.c new file mode 100644 index 0000000..142c31c --- /dev/null +++ b/src/ck_pring/snoop.c @@ -0,0 +1,134 @@ +#include +#include + +#include + +static bool +snoop_update_cursor(struct ck_pring_snooper *snoop, + const struct ck_pring *ring, bool init) +{ + const struct ck_pring_elt *buf = snoop->cons.buf; + uint64_t mask = snoop->cons.mask; + uint64_t cursor = snoop->cons.cursor; + uint64_t new_cursor; + size_t loc = cursor & mask; + + if (snoop->cons.dependency_begin < snoop->cons.dependency_end) { + (void)ck_pring_consumer_update_limit(&snoop->cons, ring); + new_cursor = snoop->cons.read_limit - 1; + } else { + new_cursor = (uint64_t)ck_pr_load_ptr(&buf[loc].gen); + } + + if (!init && (int64_t)(cursor - new_cursor) >= 0) { + return false; + } + + snoop->cons.cursor = new_cursor; + return true; +} + +void +ck_pring_snoop_init(struct ck_pring_snooper *snoop, const struct ck_pring *ring, + uint32_t dep_begin, uint32_t dep_end) +{ + + snoop->cons.buf = ring->prod.buf; + snoop->cons.mask = ring->prod.mask; + snoop->cons.dependency_begin = dep_begin; + snoop->cons.dependency_end = dep_end; + (void)snoop_update_cursor(snoop, ring, true); + return; +} + +uintptr_t +ck_pring_snoop_slow(struct ck_pring_snooper *snoop, + const struct ck_pring *ring) +{ + uintptr_t ret[1] = { 0 }; + uintptr_t n; + + n = ck_pring_snoop_n(snoop, ring, ret, 1); + /* return 0 if n == 0, otherwise ret (and n == 1). */ + return ret[0] & -n; +} + +static ssize_t +ck_pring_snoop_n_inner(struct ck_pring_snooper *snoop, uintptr_t *dst, size_t n) +{ + struct ck_pring_consumer *cons = &snoop->cons; + const struct ck_pring_elt *buf = cons->buf; + uint64_t mask = cons->mask; + uint64_t base_cursor = cons->cursor; /* only writer is us. */ + uint64_t base_gen; + size_t base_loc = base_cursor & mask; + size_t consumed; + + base_gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen); + if ((int64_t)(base_gen - base_cursor) < 0) { + /* the queue is empty. */ + return 0; + } + + for (consumed = 0; consumed < n; consumed++) { + uint64_t cursor = base_cursor + consumed; + uint64_t gen; + size_t loc = cursor & mask; + + gen = (uint64_t)ck_pr_load_ptr(&buf[loc].gen); + ck_pr_fence_load(); + dst[consumed] = ck_pr_load_ptr(&buf[loc].value); + if (gen != cursor) { + break; + } + } + + ck_pr_fence_load(); + /* everything matched up to here. make sure we didn't lose the race. */ + base_gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen); + return (base_gen == base_cursor) ? (ssize_t)consumed : -1; +} + +size_t +ck_pring_snoop_n(struct ck_pring_snooper *snoop, const struct ck_pring *ring, + uintptr_t *dst, size_t n) +{ + ssize_t ret; + + if (n == 0) { + return 0; + } + + if (n > snoop->cons.mask) { + n = snoop->cons.mask + 1; + } + + for (;;) { + struct ck_pring_consumer *cons = &snoop->cons; + uint64_t cursor = cons->cursor; + uint64_t read_limit = cons->read_limit; + size_t remaining = read_limit - cursor; + + if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) { + remaining = ck_pring_consumer_update_limit(cons, ring); + if (remaining == 0) { + return remaining; + } + } + + n = (n > remaining) ? remaining : n; + ret = ck_pring_snoop_n_inner(snoop, dst, n); + if (ret >= 0) { + break; + } + + n = (n + 1) / 2; + if (!snoop_update_cursor(snoop, ring, false)) { + ret = 0; + break; + } + } + + snoop->cons.cursor = snoop->cons.cursor + (size_t)ret; + return (size_t)ret; +}