initial commit of ck_pring: non-blocking *P*S circular buffer/disruptor

ck_pring
Paul Khuong 7 years ago
parent 0d1e86d18e
commit f9eb3fb479

@ -0,0 +1,146 @@
#ifndef CK_PRING_H
#define CK_PRING_H
#include <ck_malloc.h>
#include <ck_pring/common.h>
#include <ck_pring/dequeue.h>
#include <ck_pring/enqueue.h>
#include <ck_pring/snoop.h>
/**
* Lock-free *P*C disruptor for pointer-sized values.
*
* The consumer and producer sides may be independently specialized to
* the single producer/consumer case.
*
* On my 2.2 GHz Haswell laptop, the best-case inverse throughput for:
* - SP is ~4 cycles/enqueue;
* - SC is ~5 cycles/dequeue;
* - MP is ~24 cycles/enqueue;
* - MC is ~27 cycles/dequeue.
*
* Consumption is slightly slower than production, but consumption can
* be batched.
*
* The ring buffer for the pointer-sized values is an array of
* struct ck_pring_elt, pairs of:
* - generation counter;
* - value.
*
* The generation counter prevents ABA: when a producer is slow, it's
* hard to tell if a ring buffer element is *still* empty, or *newly*
* empty.
*
* This ring buffer attempts to minimise the communication (cache
* coherency traffic) between classes of threads (producers and
* consumers). Producers access the `prod` field, read/write to the
* ring buffer, and only read the `cons` field to refresh their cache
* of the consumer(s) cursor. Consumers access the `cons` field, and
* read the ring buffer; they *never* access the `prod` field.
*
* Production thus mostly incurs traffic for communications between
* producers (hard to avoid), any compulsory traffic on the ring
* buffer (if the buffer is large enough, this is only between
* producers). Producers only interact with consumers when their
* snapshot of the consumer cursor hints that the buffer might be
* full. If consumers keep up with producers, this should only happen
* ~once per buffer_size enqueue.
*
* Consumption only incurs traffic for communications between
* consumers (racing on the consumption cursor), and any compulsory
* traffic to read the ring buffer (negligible with a large enough
* buffer).
*
* Producers race along an unbounded sequence of generation counters
* (64 bit pointers) to atomically acquire empty cells and fill them
* with their value. The actual storage for this unbounded sequence
* is the ring buffer: the cell for sequence id x is `x % ring_size`.
*
* The cell is available if its generation value is < the generation
* counter we wish to acquire. The cell must also have been released
* by the consumer(s). The producer struct has a cache of the
* consumer(s)'s cursor; any generation value strictly less than that
* cursor is fair game. Of course, the cache may be stale (but must
* never be ahead of the actual value); when producers notice that the
* cached consumer cursor is too low, they update it before exiting
* with failure (full ring).
*
* The linear search over an unbounded sequence of generation counter
* isn't practical. Producers maintain an approximate cache of the
* next free generation counter: when a producer succeeds at producing
* a value for sequence counter `gen`, it (racily) stores `gen + 1` in
* prod.cursor. Producers thus begin their search at `prod.cursor`.
* That search is bounded: it can't go further than the consumer's
* cursor + ring_size - 1. Producers will only retry if the
* consumer's cursor has moved ahead, in which case the system has
* made global progress.
*
* Concurrent production uses a double-wide CAS (CMPXCHG16B) to
* atomically acquire/update the generation counter and set the value
* (we only need single compare / double-wide set), which trivially
* guarantees visibility (on TSO). For the SP case, we must be
* careful to write the value before publishing it by updating the
* generation.
*
* Consuming is easier. The consumer(s) maintain a cursor for the
* next sequence value to consume. The storage for that sequence
* is always at `cursor % ring_size`. They simply have to wait
* until that cell is populated with the correct sequence value,
* and update/race on cons.cursor.
*
* The value in cons.cursor imposes a limit on the largest generation
* that could be produced, so it's safe to:
* 1. read the generation;
* 2. read the value;
* 3. update cons.cursor.
*
* If the generation matches at step 1, it must also match at step 2,
* since it can only increase after step 3. This assumes fences
* between each step... fences that are happily no-ops on TSO.
*/
/**
* Size in bytes for a pring with n_consumer consumers.
*/
size_t
ck_pring_allocation_size(size_t n_consumer);
/**
* Initialize a pring of n_consumer
*/
void
ck_pring_init(struct ck_pring *, size_t n_consumer, struct ck_pring_elt *, size_t);
/**
* Allocate a pring for n consumers, with buf of bufsz elements.
*
* If consumers have dependencies, the caller is expected to set up
* the dependency begin/end half-open ranges: a consumer block with
* dependency [begin, end) will only process items after they have
* been consumed by consumer ids [begin, end).
*/
struct ck_pring *
ck_pring_create(struct ck_malloc *,
size_t n_consumer, struct ck_pring_elt *buf, size_t bufsz);
/**
* Deallocate the pring *but not the buffer*.
*/
void
ck_pring_destroy(struct ck_malloc *, struct ck_pring *);
static inline size_t
ck_pring_size(const struct ck_pring *);
struct ck_pring_elt *
ck_pring_buffer(const struct ck_pring *);
/**
* Inline implementation.
*/
static inline size_t
ck_pring_size(const struct ck_pring *ring)
{
return 1 + ring->prod.mask;
}
#endif /* !CK_PRING_H */

@ -0,0 +1,138 @@
#ifndef PRING_COMMON_H
#define PRING_COMMON_H
#include <ck_cc.h>
#include <ck_md.h>
#include <ck_pr.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
struct ck_pring_elt {
char *gen;
uintptr_t value;
} CK_CC_ALIGN(2 * sizeof(void *));
/**
* State for producers.
*
* consumer_snap is a lower bound on the oldest consumer cursor.
* We must not write past consumer_snap + mask. It is monotonic
* in the SP case, best effort otherwise.
*
* cursor is a lower bound on the next empty element. It is exact
* (and thus monotonic) in the single-producer case, best effort for
* MP.
*/
struct ck_pring_producer {
uint64_t consumer_snap; /* <= cons.cursor, accessed by prods. */
uint64_t cursor; /* <= real prod.cursor, accessed by prods & snoops. */
struct ck_pring_elt *buf; /* write once */
uint64_t mask; /* write once */
size_t n_consumer;
uintptr_t dummy;
};
/**
* State for consumers.
*
* cursor is exact (and thus monotonic), the index for the next value
* we wish to read. any sequence < cursor is safe to overwrite (which
* lets writers go up to (cursor - 1 + size) = cursor + mask).
*
* read_limit is a lower bound on the exclusive range that is safe to
* read (i.e., we can always read non-empty sequence < read_limit).
* Normal consumers, without consumer dependencies, mostly ignore this
* field (it is set to arbitrarily high values). Consumers with
* dependencies use this field to make sure they never overtake their
* parents: at any time, for any parent, read_limit <= parent.cursor.
* read_limit is monotonic for SC, best effort for MC.
*
* dependency_begin, end mark the half-open range of this consumer's
* parents.
*/
struct ck_pring_consumer {
const struct ck_pring_elt *buf; /* write-once. */
uint64_t mask; /* write-once. */
uint64_t cursor; /* written by consumers, read by all. */
uint64_t read_limit; /* private. cursor < read_limit. */
uint32_t dependency_begin; /* write once */
uint32_t dependency_end; /* write once */
};
/**
* A snooper is a consumer hidden from the rest of the system.
* Instead of relying on cursor to protect reads, snoopers must check
* the generation counter on ring buffer elements.
*/
struct ck_pring_snooper {
struct ck_pring_consumer cons;
};
struct ck_pring_consumer_block {
struct ck_pring_consumer cons;
char padding[CK_MD_CACHELINE - sizeof(struct ck_pring_consumer)];
} CK_CC_CACHELINE;
/**
* Pack more consumer blocks immediately after ck_pring for
* multi-consumer allocations.
*/
struct ck_pring {
struct ck_pring_producer prod;
char padding[CK_MD_CACHELINE - sizeof(struct ck_pring_producer)];
struct ck_pring_consumer_block cons;
} CK_CC_CACHELINE;
#define CK_PRING_INIT(BUF) \
CK_PRING_INIT_((BUF), \
CK_PRING_PO2_(sizeof(BUF) / sizeof(struct ck_pring_elt)), \
1)
/**
* Inline internals.
*/
/**
* Return the data block for consumer index (< n_consumer).
*/
static CK_CC_FORCE_INLINE struct ck_pring_consumer *
ck_pring_consumer_by_id(struct ck_pring *ring, size_t index)
{
struct ck_pring_consumer_block *consumers = &ring->cons;
/* index should only be > 0 with a heap-allocated tail of consumers. */
return &consumers[index].cons;
}
/**
* Update the read limit for the consumer. If there are no
* dependencies, arranges updates to make sure it's called very
* rarely.
*
* Return the new capacity if it is strictly positive, 0 otherwise.
*/
uintptr_t
ck_pring_consumer_update_limit(struct ck_pring_consumer *,
const struct ck_pring *);
#define CK_PRING_INIT_(BUF, SIZE, N_CONSUMER) \
{ \
.prod = { \
.consumer_snap = (SIZE), \
.cursor = (SIZE), \
.buf = (BUF), \
.mask = (SIZE) - 1, \
.n_consumer = (N_CONSUMER) \
}, \
.cons = { \
.cons = { \
.buf = (BUF), \
.mask = (SIZE) - 1, \
.cursor = (SIZE) \
} \
} \
}
#define CK_PRING_PO2_(X) \
((X) * sizeof(char[2 * !!((X) > 0 && ((X) & ((X) - 1)) == 0) - 1]))
#endif /* !PRING_COMMON_H */

@ -0,0 +1,332 @@
#ifndef PRING_DEQUEUE_H
#define PRING_DEQUEUE_H
#include <ck_pring/common.h>
/**
* Approximately how many entries are available for consumption. Only
* useful if dependencies are active.
*/
static inline size_t
ck_pring_consume_capacity(struct ck_pring *, size_t index);
/**
* Dequeue a value from a single-threaded consumer block.
*
* Return 0 if there is no new value in the pring.
*/
static inline uintptr_t
ck_pring_sdequeue(struct ck_pring *, size_t index);
/**
* Read a value from a single-threaded consumer block.
*
* Return 0 if there is no new value in the pring.
*/
static inline uintptr_t
ck_pring_sread(struct ck_pring *, size_t index);
/**
* Consume the last value returned by sread for a single-thread consumer block.
*/
static inline void
ck_pring_sconsume(struct ck_pring *, size_t index);
/**
* Dequeue up to n values from a single-thread consumer block.
*
* Return the number of values read and written to dst.
*/
size_t
ck_pring_sdequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
/**
* Read up to n values from a single-thread consumer block.
*
* Return the number of values read and written to dst.
*/
size_t
ck_pring_sread_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
/**
* Consume the last n values returned by sread_n.
*/
static inline void
ck_pring_sconsume_n(struct ck_pring *, size_t index, size_t n);
/**
* Dequeue a value from a multi-consumer block.
*
* Return 0 if there is no new value.
*/
static inline uintptr_t
ck_pring_mdequeue(struct ck_pring *, size_t index);
static inline uintptr_t
ck_pring_mtrydequeue(struct ck_pring *, size_t index);
static inline uintptr_t
ck_pring_mread(struct ck_pring *, size_t index, uint64_t *OUT_gen);
static inline uintptr_t
ck_pring_mtryread(struct ck_pring *, size_t index, uint64_t *OUT_gen);
static inline bool
ck_pring_mconsume(struct ck_pring *, size_t index, uint64_t gen);
static inline size_t
ck_pring_mdequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
static inline size_t
ck_pring_mtrydequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
static inline size_t
ck_pring_mread_n(struct ck_pring *, size_t index,
uintptr_t *dst, size_t n, uint64_t *OUT_gen);
static inline size_t
ck_pring_mtryread_n(struct ck_pring *, size_t index,
uintptr_t *dst, size_t n, uint64_t *OUT_gen);
static inline bool
ck_pring_mconsume_n(struct ck_pring *, size_t index, uint64_t gen, size_t n);
/**
* Inline implementation.
*/
static inline size_t
ck_pring_consume_capacity(struct ck_pring *ring, size_t index)
{
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
uint64_t cursor = cons->cursor;
uint64_t limit = cons->read_limit;
if (CK_CC_UNLIKELY((int64_t)(cursor - limit) >= 0)) {
return ck_pring_consumer_update_limit(cons, ring);
}
return limit - cursor;
}
uintptr_t ck_pring_sdequeue_slow(struct ck_pring *, size_t);
static inline uintptr_t
ck_pring_sdequeue(struct ck_pring *ring, size_t index)
{
struct ck_pring_elt snap;
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t cursor = cons->cursor; /* only writer is us. */
size_t loc = cursor & mask;
if (CK_CC_UNLIKELY((int64_t)(cursor - cons->read_limit) >= 0)) {
/*
* This only happens with dependencies, when consumers
* catch up to the parents. This will always be
* complex, and I don't feel bad about slowing down
* consumers that are going too fast.
*/
return ck_pring_sdequeue_slow(ring, index);
}
/*
* We know where the next element we wish to consume lives.
* Load its generation *before* its value. If the generation
* matches our cursor, consume the element and return the
* value.
*/
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
#ifdef assert
assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 &&
"Concurrent dequeue in sdequeue?");
#endif
ck_pr_fence_load();
/* read gen before value. cursor is always an upper bound on gen. */
snap.value = ck_pr_load_ptr(&buf[loc].value);
if (CK_CC_UNLIKELY((uint64_t)snap.gen != cursor)) {
/*
* This will tend to be either always false (normal
* operations) or always true (queue is empty); a cmov
* would be easy but would be slightly slower than a
* predictable branch in both cases.
*/
return 0;
}
ck_pr_fence_load_store();
/*
* Producers will read cons.cursor. Make sure to consume the
* cell's value before releasing, and make sure cursor is safe
* to read by other threads.
*/
ck_pr_store_64(&cons->cursor, cursor + 1);
return snap.value;
}
uintptr_t ck_pring_sread_slow(struct ck_pring *, size_t);
static inline uintptr_t
ck_pring_sread(struct ck_pring *ring, size_t index)
{
struct ck_pring_elt snap;
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t cursor = cons->cursor;
size_t loc = cursor & mask;
if (CK_CC_UNLIKELY((int64_t)(cursor - cons->read_limit) >= 0)) {
return ck_pring_sread_slow(ring, index);
}
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
#ifdef assert
assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 &&
"Concurrent dequeue in sread?");
#endif
ck_pr_fence_load();
snap.value = ck_pr_load_ptr(&buf[loc].value);
return ((uint64_t)snap.gen == cursor) ? snap.value : 0;
}
static inline void
ck_pring_sconsume(struct ck_pring *ring, size_t index)
{
ck_pring_sconsume_n(ring, index, 1);
return;
}
static inline void
ck_pring_sconsume_n(struct ck_pring *ring, size_t index, size_t n)
{
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
ck_pr_fence_load_store();
ck_pr_store_64(&cons->cursor, ring->cons.cons.cursor + n);
return;
}
uintptr_t
ck_pring_mdequeue_generic(struct ck_pring *ring, size_t index, bool hard);
static inline uintptr_t
ck_pring_mdequeue(struct ck_pring *ring, size_t index)
{
return ck_pring_mdequeue_generic(ring, index, true);
}
static inline uintptr_t
ck_pring_mtrydequeue(struct ck_pring *ring, size_t index)
{
return ck_pring_mdequeue_generic(ring, index, false);
}
uintptr_t ck_pring_mread_slow(struct ck_pring *, size_t, uint64_t *, bool);
static inline uintptr_t
ck_pring_mread_generic(struct ck_pring *ring, size_t index,
uint64_t *OUT_gen, bool hard)
{
struct ck_pring_elt snap;
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t cursor = ck_pr_load_64(&cons->cursor);
uint64_t read_limit = ck_pr_load_64(&cons->read_limit);
size_t loc = cursor & mask;
if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) {
goto slow;
}
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
ck_pr_fence_load();
snap.value = ck_pr_load_ptr(&buf[loc].value);
if (CK_CC_UNLIKELY((int64_t)((uint64_t)snap.gen - cursor) < 0)) {
return 0;
}
ck_pr_fence_load();
if (CK_CC_UNLIKELY((uint64_t)ck_pr_load_ptr(&buf[loc].gen) != cursor)) {
goto slow;
}
return snap.value;
slow:
return ck_pring_mread_slow(ring, index, OUT_gen, hard);
}
static inline uintptr_t
ck_pring_mread(struct ck_pring *ring, size_t index, uint64_t *OUT_gen)
{
return ck_pring_mread_generic(ring, index, OUT_gen, true);
}
static inline uintptr_t
ck_pring_mtryread(struct ck_pring *ring, size_t index, uint64_t *OUT_gen)
{
return ck_pring_mread_generic(ring, index, OUT_gen, false);
}
static inline bool
ck_pring_mconsume(struct ck_pring *ring, size_t index, uint64_t gen)
{
return ck_pring_mconsume_n(ring, index, gen, 1);
}
size_t
ck_pring_mdequeue_n_generic(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n, bool hard);
inline size_t
ck_pring_mdequeue_n(struct ck_pring *ring, size_t index, uintptr_t *dst, size_t n)
{
return ck_pring_mdequeue_n_generic(ring, index, dst, n, true);
}
inline size_t
ck_pring_mtrydequeue_n(struct ck_pring *ring, size_t index, uintptr_t *dst, size_t n)
{
return ck_pring_mdequeue_n_generic(ring, index, dst, n, false);
}
static inline bool
ck_pring_mconsume_n(struct ck_pring *ring, size_t index, uint64_t gen, size_t n)
{
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
ck_pr_fence_load_store();
return ck_pr_cas_64(&cons->cursor, gen, gen + n);
}
size_t
ck_pring_mread_n_generic(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n, uint64_t *OUT_gen, bool hard);
static inline size_t
ck_pring_mread_n(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n, uint64_t *OUT_gen)
{
return ck_pring_mread_n_generic(ring, index,
dst, n, OUT_gen, true);
}
static inline size_t
ck_pring_mtryread_n(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n, uint64_t *OUT_gen)
{
return ck_pring_mread_n_generic(ring, index,
dst, n, OUT_gen, false);
}
#endif /* !PRING_DEQUEUE_H */

@ -0,0 +1,143 @@
#ifndef PRING_ENQUEUE_H
#define PRING_ENQUEUE_H
#include <ck_pring/common.h>
/**
* Return an approximation of the remaining capacity in the ring.
*
* Exact for single producer.
*/
static inline size_t
ck_pring_enqueue_capacity(struct ck_pring *);
/**
* Attempt to enqueue one value in a single-producer pring.
*
* Return true iff success.
*/
static inline bool
ck_pring_senqueue(struct ck_pring *, uintptr_t);
/**
* Attempt to enqueue one value in a single-producer pring.
*
* Return true iff success, in which case old_value was overwritten
* with the value previously in the ring buffer element. The value
* is unspecified on failure.
*/
static inline bool
ck_pring_senqueue_val(struct ck_pring *, uintptr_t, uintptr_t *old_value);
/**
* Attempt to enqueue up to n values in a single-producer pring.
*
* Return the number of values enqueued; values[0 .. ret) is updated
* with the value previously in the ring buffer elements.
*/
size_t
ck_pring_senqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n);
/**
* Attempt to enqueue one value in a multi-producer pring.
*
* Return true iff success.
*/
static inline bool
ck_pring_menqueue(struct ck_pring *, uintptr_t);
/**
* Attempt to enqueue one value in a multi-producer pring.
*
* Return true iff success, in which case old_value was overwritten
* with the value previously in the ring buffer element. The value
* is unspecified on failure.
*/
bool
ck_pring_menqueue_val(struct ck_pring *, uintptr_t, uintptr_t *old_value);
/**
* Attempt to enqueue up to n values in a single-producer pring.
*
* Return the number of values enqueued; values[0 .. ret) is updated
* with the value previously in the ring buffer elements.
*/
size_t
ck_pring_menqueue_n(struct ck_pring *, uintptr_t *values, size_t n);
/**
* Inline implementation.
*/
static inline bool
ck_pring_menqueue(struct ck_pring *ring, uintptr_t value)
{
uintptr_t buf;
(void)buf;
return ck_pring_menqueue_val(ring, value, &buf);
}
size_t
ck_pring_enqueue_capacity_slow(struct ck_pring *ring);
static inline size_t
ck_pring_enqueue_capacity(struct ck_pring *ring)
{
uint64_t mask = ring->prod.mask;
uint64_t consumer_snap = ring->prod.consumer_snap;
uint64_t cursor = ring->prod.cursor;
if (CK_CC_UNLIKELY(cursor - consumer_snap > mask)) {
return ck_pring_enqueue_capacity_slow(ring);
}
return (consumer_snap + mask + 1) - cursor;
}
static inline bool
ck_pring_senqueue(struct ck_pring *ring, uintptr_t value)
{
return ck_pring_senqueue_val(ring, value, &ring->prod.dummy);
}
bool
ck_pring_senqueue_val_slow(struct ck_pring *ring, uintptr_t value,
uintptr_t *old_value);
static inline bool
ck_pring_senqueue_val(struct ck_pring *ring, uintptr_t value,
uintptr_t *old_value)
{
struct ck_pring_elt *buf = ring->prod.buf;
struct ck_pring_elt *dst;
uint64_t mask = ring->prod.mask;
/* only writer to prod.* is us. */
uint64_t consumer_snap = ring->prod.consumer_snap;
uint64_t cursor = ring->prod.cursor;
size_t loc = cursor & mask;
/*
* We know where we want to write. Make sure our snapshot of
* the consumer cursor lets us write there (or update the
* snapshot), and write the value *before* publishing
* the new generation.
*/
dst = &buf[loc];
#ifdef __GNUC__
__asm__("" : "+r"(dst)); /* compute dst before the branch. */
#endif
if (CK_CC_UNLIKELY((cursor - consumer_snap) > mask)) {
return ck_pring_senqueue_val_slow(ring, value, old_value);
}
/* We're not too far. do the write! */
*old_value = dst->value;
ck_pr_store_ptr((void **)&dst->value, (void *)value);
ck_pr_fence_store();
ck_pr_store_ptr(&dst->gen, (void *)cursor);
ck_pr_fence_store();
ck_pr_store_64(&ring->prod.cursor, cursor + 1);
return true;
}
#endif /* !PRING_ENQUEUE_H */

@ -0,0 +1,93 @@
#ifndef PRING_SNOOP_H
#define PRING_SNOOP_H
#include <ck_pring/common.h>
/**
* Initialize a snooper block. Snoopers are consumers that do not
* block producers. dep_begin, dep_end specifies a range of consumer
* ids not to overtake.
*/
void
ck_pring_snoop_init(struct ck_pring_snooper *, const struct ck_pring *,
uint32_t dep_begin, uint32_t dep_end);
/**
* Approximately how many entries are available for snooping. Only
* useful if dependencies are active.
*/
static inline size_t
ck_pring_snoop_capacity(struct ck_pring_snooper *, const struct ck_pring *);
/**
* Snoop the next value from the pring.
*
* Return 0 on failure.
*/
static inline uintptr_t
ck_pring_snoop(struct ck_pring_snooper *, const struct ck_pring *);
/**
* Snoop up to n values from the pring.
*
* Return the number of values snooped and written to dst.
*/
size_t
ck_pring_snoop_n(struct ck_pring_snooper *, const struct ck_pring *,
uintptr_t *dst, size_t n);
/**
* Inline implementation.
*/
static inline size_t
ck_pring_snoop_capacity(struct ck_pring_snooper *snoop,
const struct ck_pring *ring)
{
uint64_t cursor = snoop->cons.cursor;
uint64_t limit = snoop->cons.read_limit;
if (CK_CC_UNLIKELY((int64_t)(cursor - limit) >= 0)) {
return ck_pring_consumer_update_limit(&snoop->cons, ring);
}
return limit - cursor;
}
uintptr_t
ck_pring_snoop_slow(struct ck_pring_snooper *snoop,
const struct ck_pring *ring);
static inline uintptr_t
ck_pring_snoop(struct ck_pring_snooper *snoop, const struct ck_pring *ring)
{
struct ck_pring_elt snap;
const struct ck_pring_elt *buf = snoop->cons.buf;
uint64_t mask = snoop->cons.mask;
uint64_t cursor = snoop->cons.cursor;
size_t loc = cursor & mask;
if (CK_CC_UNLIKELY((int64_t)(cursor - snoop->cons.read_limit) >= 0)) {
goto slow;
}
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
ck_pr_fence_load();
/* read gen before value. cursor is an upper bound on gen. */
snap.value = ck_pr_load_ptr(&buf[loc].value);
if (CK_CC_UNLIKELY((int64_t)((uint64_t)snap.gen - cursor) < 0)) {
/* gen is too old. queue is still empty. */
return 0;
}
ck_pr_fence_load();
if (CK_CC_UNLIKELY((uint64_t)ck_pr_load_ptr(&buf[loc].gen) != cursor)) {
/* gen doesn't match and/or cursor is out of date; try again. */
goto slow;
}
snoop->cons.cursor = cursor + 1;
return snap.value;
slow:
return ck_pring_snoop_slow(snoop, ring);
}
#endif /* !PRING_SNOOP_H */

@ -16,7 +16,11 @@ OBJECTS=ck_barrier_centralized.o \
ck_hp.o \
ck_hs.o \
ck_rhs.o \
ck_array.o
ck_array.o \
ck_pring.o \
ck_pring_dequeue.o \
ck_pring_enqueue.o \
ck_pring_snoop.o
all: $(ALL_LIBS)
@ -59,6 +63,11 @@ ck_barrier_tournament.o: $(SDIR)/ck_barrier_tournament.c
ck_barrier_mcs.o: $(SDIR)/ck_barrier_mcs.c
$(CC) $(CFLAGS) -c -o $(TARGET_DIR)/ck_barrier_mcs.o $(SDIR)/ck_barrier_mcs.c
ck_pring.o: $(SDIR)/ck_pring.c $(INCLUDE_DIR)/ck_pring.h $(INCLUDE_DIR)/ck_pring/common.h
$(CC) $(CFLAGS) -c -o $(TARGET_DIR)/ck_pring.o $(SDIR)/ck_pring.c
ck_pring_%.o: $(SDIR)/ck_pring/%.c $(INCLUDE_DIR)/ck_pring.h $(INCLUDE_DIR)/ck_pring/common.h $(INCLUDE_DIR)/ck_pring/%.h
$(CC) $(CFLAGS) -c -o $@ $<
clean:
rm -rf $(TARGET_DIR)/*.dSYM $(TARGET_DIR)/*~ $(TARGET_DIR)/*.o \
$(OBJECTS) $(TARGET_DIR)/libck.a $(TARGET_DIR)/libck.so

@ -0,0 +1,109 @@
#define CK_PRING_IMPL
#include <assert.h>
#include <ck_pring.h>
#include <string.h>
size_t
ck_pring_allocation_size(size_t n_consumer)
{
size_t consumer_size;
size_t extra = 0;
if (n_consumer > 0) {
extra = n_consumer - 1;
}
consumer_size = extra * sizeof(struct ck_pring_consumer_block);
return sizeof(struct ck_pring) + consumer_size;
}
void
ck_pring_init(struct ck_pring *ring, size_t n_consumer,
struct ck_pring_elt *buf, size_t size)
{
struct ck_pring_consumer_block *next = (void *)(ring + 1);
assert(size > 0 && (size & (size - 1)) == 0);
*ring = (struct ck_pring) CK_PRING_INIT_(buf, size, n_consumer);
for (size_t i = 1; i < n_consumer; i++) {
next[i - 1].cons = ring->cons.cons;
}
ck_pr_fence_store();
return;
}
struct ck_pring *
ck_pring_create(struct ck_malloc *ck_malloc,
size_t n_consumer, struct ck_pring_elt *buf, size_t bufsz)
{
struct ck_pring *ret;
size_t ringsz;
ringsz = ck_pring_allocation_size(n_consumer);
ret = ck_malloc->malloc(ringsz);
memset(ret, 0, ringsz);
ck_pring_init(ret, n_consumer, buf, bufsz);
return ret;
}
void
ck_pring_destroy(struct ck_malloc *ck_malloc, struct ck_pring *ring)
{
size_t ringsz;
if (ring == NULL) {
return;
}
ringsz = ck_pring_allocation_size(ring->prod.n_consumer);
ck_malloc->free(ring, ringsz, false);
return;
}
struct ck_pring_elt *
ck_pring_buffer(const struct ck_pring *ring)
{
return ring->prod.buf;
}
/* Declared in pring_common.h */
uintptr_t
ck_pring_consumer_update_limit(struct ck_pring_consumer *consumer,
const struct ck_pring *ring)
{
const struct ck_pring_consumer_block *consumers = &ring->cons;
uint64_t old_limit = ck_pr_load_64(&consumer->read_limit);
uint64_t limit = old_limit + (1UL << 60);
uint64_t capacity;
size_t dep_begin = consumer->dependency_begin;
size_t dep_end = consumer->dependency_end;
/* Common case: no dependency. */
if (CK_CC_LIKELY(dep_begin >= dep_end)) {
capacity = consumer->mask + 1;
ck_pr_store_64(&consumer->read_limit, limit);
return capacity;
}
for (size_t i = dep_end; i --> dep_begin; ) {
const struct ck_pring_consumer *current = &consumers[i].cons;
uint64_t current_cursor;
size_t begin = current->dependency_begin;
size_t skip;
current_cursor = ck_pr_load_64(&current->cursor);
skip = (current->dependency_end >= i) ? begin : i;
if ((int64_t)(current_cursor - limit) < 0) {
limit = current_cursor;
}
i = (skip < i) ? skip : i;
}
capacity = limit - ck_pr_load_64(&consumer->cursor);
ck_pr_store_64(&consumer->read_limit, limit);
return ((int64_t)capacity > 0) ? capacity : 0;
}

@ -0,0 +1,312 @@
#include <assert.h>
#include <ck_pring/dequeue.h>
#include <sys/types.h>
uintptr_t
ck_pring_sdequeue_slow(struct ck_pring *ring, size_t index)
{
uintptr_t buf[1] = { 0 };
uintptr_t n;
n = ck_pring_sdequeue_n(ring, index, buf, 1);
return buf[0] & -n;
}
uintptr_t
ck_pring_sread_slow(struct ck_pring *ring, size_t index)
{
uintptr_t buf[1] = { 0 };
uintptr_t n;
n = ck_pring_sread_n(ring, index, buf, 1);
return buf[0] & -n;
}
size_t
ck_pring_sdequeue_n(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n)
{
size_t read;
read = ck_pring_sread_n(ring, index, dst, n);
ck_pring_sconsume_n(ring, index, read);
return read;
}
size_t
ck_pring_sread_n(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n)
{
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t base_cursor = cons->cursor; /* only writer is us. */
uint64_t read_limit = ck_pr_load_64(&cons->read_limit);
size_t capacity = read_limit - base_cursor;
size_t consumed;
if (CK_CC_UNLIKELY((int64_t)(base_cursor - read_limit) >= 0)) {
capacity = ck_pring_consumer_update_limit(cons, ring);
if (capacity == 0) {
return capacity;
}
}
n = (n > capacity) ? capacity : n;
/*
* No check for n == 0. This situation should be rare, and
* the code below correctly handles it.
*/
/*
* See if we can immediately read n values. We know values
* from base_cursor onward have not been overwritten. We only
* have to check if the last item we wish to read has been
* produced before copying everything.
*/
{
struct ck_pring_elt snap;
uint64_t last_cursor = base_cursor + n - 1;
size_t last_loc = last_cursor & mask;
snap.gen = ck_pr_load_ptr(&buf[last_loc].gen);
ck_pr_fence_load();
if (CK_CC_UNLIKELY((uint64_t)snap.gen != last_cursor)) {
goto slow;
}
for (size_t i = 0; i < n; i++) {
uint64_t cursor = base_cursor + i;
size_t loc = cursor & mask;
dst[i] = ck_pr_load_ptr(&buf[loc].value);
}
return n;
}
slow:
for (consumed = 0; consumed < n; consumed++) {
struct ck_pring_elt snap;
uint64_t cursor = base_cursor + consumed;
size_t loc = cursor & mask;
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
ck_pr_fence_load();
snap.value = ck_pr_load_ptr(&buf[loc].value);
if ((uint64_t)snap.gen != cursor) {
assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 &&
"Concurrent dequeue in sdequeue?");
break;
}
dst[consumed] = snap.value;
}
return consumed;
}
/**
* Update snap with the value for cursor in the ring buffer.
* Returns a comparator value for generation ?= cursor.
* 0 means we have the value we're looking for.
* Negative means the value is older than expected (the cell
* is empty).
* Positive means the value is younger than expected (we
* were too slow and lost the race).
*/
static inline int64_t
preacquire(struct ck_pring_elt *snap,
const struct ck_pring_elt *buf, uint64_t mask, uint64_t cursor)
{
size_t loc = cursor & mask;
snap->gen = ck_pr_load_ptr(&buf[loc].gen);
ck_pr_fence_load();
snap->value = ck_pr_load_ptr(&buf[loc].value);
return (int64_t)((uint64_t)snap->gen - cursor);
}
uintptr_t
ck_pring_mdequeue_generic(struct ck_pring *ring, size_t index, bool hard)
{
struct ck_pring_elt snap;
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t cursor = ck_pr_load_64(&cons->cursor);
uint64_t read_limit = ck_pr_load_64(&cons->read_limit);
if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) {
uintptr_t ret;
ret = ck_pring_consumer_update_limit(cons, ring);
if (ret == 0) {
return ret;
}
}
/* Fast path, assuming our cursor is up to date. */
{
int64_t ret;
ret = preacquire(&snap, buf, mask, cursor);
if (CK_CC_LIKELY(ret == 0)) {
/*
* The ring buffer element is up to date.
* Attempt to acquire it!
*/
if (CK_CC_LIKELY(ck_pr_cas_64(&cons->cursor,
cursor, cursor + 1))) {
return snap.value;
}
} else if (CK_CC_LIKELY(ret < 0)) {
/*
* The ring buffer element is too old (still
* empty). Fail immediately.
*/
return 0;
}
}
{
uintptr_t arr[1] = { 0 };
uintptr_t n;
n = ck_pring_mdequeue_n_generic(ring, index, arr, 1, hard);
return arr[0] & -n;
}
}
uintptr_t
ck_pring_mread_slow(struct ck_pring *ring, size_t index,
uint64_t *OUT_gen, bool hard)
{
uintptr_t buf[1];
uintptr_t n;
n = ck_pring_mread_n_generic(ring, index, buf, 1, OUT_gen, hard);
return buf[0] & -n;
}
size_t
ck_pring_mdequeue_n_generic(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n, bool hard)
{
for (;;) {
uint64_t gen;
size_t ret;
ret = ck_pring_mread_n_generic(ring, index, dst, n, &gen, hard);
if (ret == 0 || ck_pring_mconsume_n(ring, index, gen, ret)) {
return ret;
}
if (!hard) {
return 0;
}
n = (n + 1) / 2;
}
return 0;
}
size_t
ck_pring_mread_n_generic(struct ck_pring *ring, size_t index,
uintptr_t *dst, size_t n, uint64_t *OUT_gen, bool hard)
{
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t base_cursor;
uint64_t read_limit;
size_t base_loc;
size_t capacity;
size_t consumed;
retry:
base_cursor = ck_pr_load_64(&cons->cursor);
read_limit = ck_pr_load_64(&cons->read_limit);
base_loc = base_cursor & mask;
capacity = read_limit - base_cursor;
if (CK_CC_UNLIKELY((int64_t)(base_cursor - read_limit) >= 0)) {
capacity = ck_pring_consumer_update_limit(cons, ring);
if (capacity == 0) {
return capacity;
}
}
n = (n > capacity) ? capacity : n;
*OUT_gen = base_cursor;
{
struct ck_pring_elt snap;
uint64_t last_cursor = base_cursor + n - 1;
size_t last_loc = last_cursor & mask;
snap.gen = ck_pr_load_ptr(&buf[last_loc].gen);
ck_pr_fence_load();
if (CK_CC_UNLIKELY((uint64_t)snap.gen != last_cursor)) {
goto slow;
}
for (size_t i = 0; i < n; i++) {
uint64_t cursor = base_cursor + i;
size_t loc = cursor & mask;
dst[i] = ck_pr_load_ptr(&buf[loc].value);
}
ck_pr_fence_load();
if (n <= 1 ||
(uint64_t)ck_pr_load_ptr(&buf[base_loc].gen) == base_cursor) {
return n;
}
if (!hard) {
return 0;
}
/* We started with snap.gen == last_cursor, so we lost a race. */
n = 1;
goto retry;
}
slow:
if (n == 0) {
return 0;
}
for (consumed = 0; consumed < n; consumed++) {
struct ck_pring_elt snap;
uint64_t cursor = base_cursor + consumed;
size_t loc = cursor & mask;
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
ck_pr_fence_load();
snap.value = ck_pr_load_ptr(&buf[loc].value);
if ((uint64_t)snap.gen != cursor) {
break;
}
dst[consumed] = snap.value;
}
if (consumed == 0 && hard) {
uint64_t gen;
gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen);
/* Only retry if we lost the race. */
if ((int64_t)(gen - base_cursor) > 0) {
n = 1;
goto retry;
}
}
return consumed;
}

@ -0,0 +1,314 @@
#include <assert.h>
#include <ck_pring/enqueue.h>
#include <ck_cc.h>
#include <ck_pr.h>
#include <ck_limits.h>
#include <sys/types.h>
static uint64_t
oldest_consumer_snap(const struct ck_pring *ring, uint64_t cursor)
{
const struct ck_pring_consumer_block *consumers = &ring->cons;
size_t n_consumer = ring->prod.n_consumer;
uint64_t ret = cursor;
for (size_t i = n_consumer; i --> 0; ) {
const struct ck_pring_consumer *current = &consumers[i].cons;
uint64_t current_cursor = ck_pr_load_64(&current->cursor);
size_t skip = current->dependency_begin;
if ((int64_t)(current_cursor - ret) < 0) {
ret = current_cursor;
}
/*
* Current cursor includes [begin, end). If end >= i,
* we may skip everything down to and including begin:
* all skipped indices are covered by the current
* cursor.
*/
i = (current->dependency_end >= i) ? skip : i;
}
return ret;
}
size_t
ck_pring_enqueue_capacity_slow(struct ck_pring *ring)
{
uint64_t mask = ring->prod.mask;
uint64_t consumer_snap;
uint64_t cursor = ring->prod.cursor;
consumer_snap = oldest_consumer_snap(ring, cursor);
ck_pr_store_64(&ring->prod.consumer_snap, consumer_snap);
if (cursor - consumer_snap > mask) {
return 0;
}
return (consumer_snap + mask + 1) - cursor;
}
bool
ck_pring_senqueue_val_slow(struct ck_pring *ring, uintptr_t value,
uintptr_t *old_value)
{
uint64_t mask = ring->prod.mask;
uint64_t consumer_snap;
uint64_t cursor = ring->prod.cursor;
consumer_snap = oldest_consumer_snap(ring, cursor);
ring->prod.consumer_snap = consumer_snap;
if (cursor - consumer_snap > mask) {
return false;
}
return ck_pring_senqueue_val(ring, value, old_value);
}
size_t
ck_pring_senqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n)
{
struct ck_pring_elt *buf = ring->prod.buf;
uint64_t mask = ring->prod.mask;
uint64_t base_cursor = ring->prod.cursor;
size_t capacity;
size_t produced;
capacity = ck_pring_enqueue_capacity(ring);
if (n > capacity) {
n = capacity;
}
for (produced = 0; produced < n; produced++) {
struct ck_pring_elt *dst;
uint64_t cursor = base_cursor + produced;
uintptr_t previous;
size_t loc = cursor & mask;
dst = &buf[loc];
previous = dst->value;
ck_pr_store_ptr((void **)&dst->value, (void *)values[produced]);
ck_pr_fence_store();
ck_pr_store_ptr(&dst->gen, (void *)cursor);
values[produced] = previous;
}
ck_pr_fence_store();
ck_pr_store_64(&ring->prod.cursor, base_cursor + produced);
return produced;
}
/**
* Assuming that cursor is safe to overwrite (i.e., is at most mask
* ahead of the read-side cursor), attempt to overwrite an *older*
* record with our new value.
*
* Ret < 0 means success.
* Ret == 0 means we lost the race to update cursor.
* Ret > 0 means we lost the race by (at least) a full revolution
* around the ring buffer.
*/
static int64_t
try_menqueue_one(struct ck_pring_producer *snap,
uintptr_t value,
uint64_t cursor,
uintptr_t *old)
{
struct ck_pring_elt expected, update;
struct ck_pring_elt *buf = snap->buf;
uint64_t actual_gen;
uint64_t mask = snap->mask;
uint64_t loc = cursor & mask;
int64_t ret;
expected.value = ck_pr_load_ptr(&buf[loc].value);
/* no barrier here: the CAS will just fail. */
expected.gen = ck_pr_load_ptr(&buf[loc].gen);
actual_gen = (uint64_t)expected.gen;
ret = (int64_t)(actual_gen - cursor);
if (ret >= 0) {
/* we're trying to replace a fresh record. fail. */
goto late;
}
update.value = value;
update.gen = (void *)cursor;
if (CK_CC_LIKELY(ck_pr_cas_ptr_2_value(&buf[loc], &expected, &update,
&expected))) {
*old = expected.value;
return ret;
}
/*
* if we failed, try again. the dwcas gave us a consistent
* read, so no spurious failure here.
*/
actual_gen = (uint64_t)expected.gen;
ret = (int64_t)(actual_gen - cursor);
if (ret >= 0) {
goto late;
}
if (ck_pr_cas_ptr_2_value(&buf[loc], &expected, &update,
&expected)) {
*old = expected.value;
return ret;
}
actual_gen = (uint64_t)expected.gen;
ret = 0;
late:
/*
* If we're late, we know the next "free" generation value is
* at least one more than the one we just observed.
*/
snap->cursor = actual_gen + 1;
return ret;
}
/**
* Bounded linear search for an empty cell, up to snap->cons.consumer_smap + mask;
* Return true on success, false otherwise.
* Update the snapshot's (write) cursor on failure.
* Update the producer-side cache on success.
*/
static inline bool
try_menqueue(struct ck_pring *ring,
struct ck_pring_producer *snap,
uintptr_t value,
uintptr_t *old)
{
uint64_t consumer_snap = snap->consumer_snap;
uint64_t cursor = snap->cursor;
uint64_t mask = snap->mask;
if ((int64_t)(cursor - consumer_snap) < 0) {
cursor = consumer_snap;
}
for (; (cursor - consumer_snap) <= mask; cursor++) {
int64_t ret;
ret = try_menqueue_one(snap, value, cursor, old);
if (ret > 0) {
/*
* we're really off. break out of here and
* update our snapshots. try_menqueue_one
* already updated snap->cursor.
*/
return false;
}
/*
* Success!
*/
if (ret < 0) {
ck_pr_store_64(&ring->prod.cursor, cursor + 1);
ck_pr_store_64(&ring->prod.consumer_snap,
snap->consumer_snap);
return true;
}
}
snap->cursor = cursor;
return false;
}
bool
ck_pring_menqueue_val(struct ck_pring *ring, uintptr_t value,
uintptr_t *old)
{
struct ck_pring_producer snap;
snap.buf = ring->prod.buf;
snap.mask = ring->prod.mask;
snap.consumer_snap = ck_pr_load_64(&ring->prod.consumer_snap);
snap.cursor = ck_pr_load_64(&ring->prod.cursor);
/*
* Fast path: snap.cursor isn't too far ahead. Immediately
* try to write there.
* We only access the producers' struct and the buffer.
*/
if (CK_CC_LIKELY((snap.cursor - snap.consumer_snap) <= snap.mask)) {
if (CK_CC_LIKELY(
try_menqueue_one(&snap, value, snap.cursor, old) < 0)) {
/* Success: racily update our local cursor and win. */
ck_pr_store_64(&ring->prod.cursor, snap.cursor + 1);
return true;
}
}
/*
* Slow path: update our private snapshot from the producers'
* and consumers' structs, until the consumers' cursor stops
* moving (if that happens, it's really time to fail).
*/
for (;;) {
uint64_t consumer_snap;
uint64_t prod_cursor;
/*
* Linear search for an empty cell that's OK to
* overwrite. On success, nothing else to do:
* try_menqueue updates the producers' cache.
*/
if (try_menqueue(ring, &snap, value, old)) {
return true;
}
prod_cursor = ck_pr_load_64(&ring->prod.cursor);
/*
* prod.cursor is a racy hint. Either update
* our cache or move the global cursor ahead.
*/
if ((int64_t)(prod_cursor - snap.cursor) > 0) {
snap.cursor = prod_cursor;
} else {
ck_pr_store_64(&ring->prod.cursor, snap.cursor);
}
consumer_snap = oldest_consumer_snap(ring, snap.cursor);
/* No progress on the consumer's end. Stop trying.*/
if (consumer_snap == snap.consumer_snap) {
uint64_t current_snap;
/* Update the global snap if it's older than ours. */
current_snap = ck_pr_load_64(&ring->prod.consumer_snap);
if ((int64_t)(consumer_snap - current_snap) < 0) {
ck_pr_store_64(&ring->prod.consumer_snap,
consumer_snap);
}
break;
}
snap.consumer_snap = consumer_snap;
}
return false;
}
size_t
ck_pring_menqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n)
{
/* XXX: do this better. */
for (size_t i = 0; i < n; i++) {
uintptr_t current = values[i];
uintptr_t update;
if (!ck_pring_menqueue_val(ring, current, &update)) {
return i;
}
values[i] = update;
}
return n;
}

@ -0,0 +1,134 @@
#include <assert.h>
#include <ck_pring/snoop.h>
#include <sys/types.h>
static bool
snoop_update_cursor(struct ck_pring_snooper *snoop,
const struct ck_pring *ring, bool init)
{
const struct ck_pring_elt *buf = snoop->cons.buf;
uint64_t mask = snoop->cons.mask;
uint64_t cursor = snoop->cons.cursor;
uint64_t new_cursor;
size_t loc = cursor & mask;
if (snoop->cons.dependency_begin < snoop->cons.dependency_end) {
(void)ck_pring_consumer_update_limit(&snoop->cons, ring);
new_cursor = snoop->cons.read_limit - 1;
} else {
new_cursor = (uint64_t)ck_pr_load_ptr(&buf[loc].gen);
}
if (!init && (int64_t)(cursor - new_cursor) >= 0) {
return false;
}
snoop->cons.cursor = new_cursor;
return true;
}
void
ck_pring_snoop_init(struct ck_pring_snooper *snoop, const struct ck_pring *ring,
uint32_t dep_begin, uint32_t dep_end)
{
snoop->cons.buf = ring->prod.buf;
snoop->cons.mask = ring->prod.mask;
snoop->cons.dependency_begin = dep_begin;
snoop->cons.dependency_end = dep_end;
(void)snoop_update_cursor(snoop, ring, true);
return;
}
uintptr_t
ck_pring_snoop_slow(struct ck_pring_snooper *snoop,
const struct ck_pring *ring)
{
uintptr_t ret[1] = { 0 };
uintptr_t n;
n = ck_pring_snoop_n(snoop, ring, ret, 1);
/* return 0 if n == 0, otherwise ret (and n == 1). */
return ret[0] & -n;
}
static ssize_t
ck_pring_snoop_n_inner(struct ck_pring_snooper *snoop, uintptr_t *dst, size_t n)
{
struct ck_pring_consumer *cons = &snoop->cons;
const struct ck_pring_elt *buf = cons->buf;
uint64_t mask = cons->mask;
uint64_t base_cursor = cons->cursor; /* only writer is us. */
uint64_t base_gen;
size_t base_loc = base_cursor & mask;
size_t consumed;
base_gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen);
if ((int64_t)(base_gen - base_cursor) < 0) {
/* the queue is empty. */
return 0;
}
for (consumed = 0; consumed < n; consumed++) {
uint64_t cursor = base_cursor + consumed;
uint64_t gen;
size_t loc = cursor & mask;
gen = (uint64_t)ck_pr_load_ptr(&buf[loc].gen);
ck_pr_fence_load();
dst[consumed] = ck_pr_load_ptr(&buf[loc].value);
if (gen != cursor) {
break;
}
}
ck_pr_fence_load();
/* everything matched up to here. make sure we didn't lose the race. */
base_gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen);
return (base_gen == base_cursor) ? (ssize_t)consumed : -1;
}
size_t
ck_pring_snoop_n(struct ck_pring_snooper *snoop, const struct ck_pring *ring,
uintptr_t *dst, size_t n)
{
ssize_t ret;
if (n == 0) {
return 0;
}
if (n > snoop->cons.mask) {
n = snoop->cons.mask + 1;
}
for (;;) {
struct ck_pring_consumer *cons = &snoop->cons;
uint64_t cursor = cons->cursor;
uint64_t read_limit = cons->read_limit;
size_t remaining = read_limit - cursor;
if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) {
remaining = ck_pring_consumer_update_limit(cons, ring);
if (remaining == 0) {
return remaining;
}
}
n = (n > remaining) ? remaining : n;
ret = ck_pring_snoop_n_inner(snoop, dst, n);
if (ret >= 0) {
break;
}
n = (n + 1) / 2;
if (!snoop_update_cursor(snoop, ring, false)) {
ret = 0;
break;
}
}
snoop->cons.cursor = snoop->cons.cursor + (size_t)ret;
return (size_t)ret;
}
Loading…
Cancel
Save