Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Paul Khuong | f9eb3fb479 | 7 years ago |
@ -0,0 +1,146 @@
|
|||||||
|
#ifndef CK_PRING_H
|
||||||
|
#define CK_PRING_H
|
||||||
|
#include <ck_malloc.h>
|
||||||
|
#include <ck_pring/common.h>
|
||||||
|
#include <ck_pring/dequeue.h>
|
||||||
|
#include <ck_pring/enqueue.h>
|
||||||
|
#include <ck_pring/snoop.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock-free *P*C disruptor for pointer-sized values.
|
||||||
|
*
|
||||||
|
* The consumer and producer sides may be independently specialized to
|
||||||
|
* the single producer/consumer case.
|
||||||
|
*
|
||||||
|
* On my 2.2 GHz Haswell laptop, the best-case inverse throughput for:
|
||||||
|
* - SP is ~4 cycles/enqueue;
|
||||||
|
* - SC is ~5 cycles/dequeue;
|
||||||
|
* - MP is ~24 cycles/enqueue;
|
||||||
|
* - MC is ~27 cycles/dequeue.
|
||||||
|
*
|
||||||
|
* Consumption is slightly slower than production, but consumption can
|
||||||
|
* be batched.
|
||||||
|
*
|
||||||
|
* The ring buffer for the pointer-sized values is an array of
|
||||||
|
* struct ck_pring_elt, pairs of:
|
||||||
|
* - generation counter;
|
||||||
|
* - value.
|
||||||
|
*
|
||||||
|
* The generation counter prevents ABA: when a producer is slow, it's
|
||||||
|
* hard to tell if a ring buffer element is *still* empty, or *newly*
|
||||||
|
* empty.
|
||||||
|
*
|
||||||
|
* This ring buffer attempts to minimise the communication (cache
|
||||||
|
* coherency traffic) between classes of threads (producers and
|
||||||
|
* consumers). Producers access the `prod` field, read/write to the
|
||||||
|
* ring buffer, and only read the `cons` field to refresh their cache
|
||||||
|
* of the consumer(s) cursor. Consumers access the `cons` field, and
|
||||||
|
* read the ring buffer; they *never* access the `prod` field.
|
||||||
|
*
|
||||||
|
* Production thus mostly incurs traffic for communications between
|
||||||
|
* producers (hard to avoid), any compulsory traffic on the ring
|
||||||
|
* buffer (if the buffer is large enough, this is only between
|
||||||
|
* producers). Producers only interact with consumers when their
|
||||||
|
* snapshot of the consumer cursor hints that the buffer might be
|
||||||
|
* full. If consumers keep up with producers, this should only happen
|
||||||
|
* ~once per buffer_size enqueue.
|
||||||
|
*
|
||||||
|
* Consumption only incurs traffic for communications between
|
||||||
|
* consumers (racing on the consumption cursor), and any compulsory
|
||||||
|
* traffic to read the ring buffer (negligible with a large enough
|
||||||
|
* buffer).
|
||||||
|
*
|
||||||
|
* Producers race along an unbounded sequence of generation counters
|
||||||
|
* (64 bit pointers) to atomically acquire empty cells and fill them
|
||||||
|
* with their value. The actual storage for this unbounded sequence
|
||||||
|
* is the ring buffer: the cell for sequence id x is `x % ring_size`.
|
||||||
|
*
|
||||||
|
* The cell is available if its generation value is < the generation
|
||||||
|
* counter we wish to acquire. The cell must also have been released
|
||||||
|
* by the consumer(s). The producer struct has a cache of the
|
||||||
|
* consumer(s)'s cursor; any generation value strictly less than that
|
||||||
|
* cursor is fair game. Of course, the cache may be stale (but must
|
||||||
|
* never be ahead of the actual value); when producers notice that the
|
||||||
|
* cached consumer cursor is too low, they update it before exiting
|
||||||
|
* with failure (full ring).
|
||||||
|
*
|
||||||
|
* The linear search over an unbounded sequence of generation counter
|
||||||
|
* isn't practical. Producers maintain an approximate cache of the
|
||||||
|
* next free generation counter: when a producer succeeds at producing
|
||||||
|
* a value for sequence counter `gen`, it (racily) stores `gen + 1` in
|
||||||
|
* prod.cursor. Producers thus begin their search at `prod.cursor`.
|
||||||
|
* That search is bounded: it can't go further than the consumer's
|
||||||
|
* cursor + ring_size - 1. Producers will only retry if the
|
||||||
|
* consumer's cursor has moved ahead, in which case the system has
|
||||||
|
* made global progress.
|
||||||
|
*
|
||||||
|
* Concurrent production uses a double-wide CAS (CMPXCHG16B) to
|
||||||
|
* atomically acquire/update the generation counter and set the value
|
||||||
|
* (we only need single compare / double-wide set), which trivially
|
||||||
|
* guarantees visibility (on TSO). For the SP case, we must be
|
||||||
|
* careful to write the value before publishing it by updating the
|
||||||
|
* generation.
|
||||||
|
*
|
||||||
|
* Consuming is easier. The consumer(s) maintain a cursor for the
|
||||||
|
* next sequence value to consume. The storage for that sequence
|
||||||
|
* is always at `cursor % ring_size`. They simply have to wait
|
||||||
|
* until that cell is populated with the correct sequence value,
|
||||||
|
* and update/race on cons.cursor.
|
||||||
|
*
|
||||||
|
* The value in cons.cursor imposes a limit on the largest generation
|
||||||
|
* that could be produced, so it's safe to:
|
||||||
|
* 1. read the generation;
|
||||||
|
* 2. read the value;
|
||||||
|
* 3. update cons.cursor.
|
||||||
|
*
|
||||||
|
* If the generation matches at step 1, it must also match at step 2,
|
||||||
|
* since it can only increase after step 3. This assumes fences
|
||||||
|
* between each step... fences that are happily no-ops on TSO.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Size in bytes for a pring with n_consumer consumers.
|
||||||
|
*/
|
||||||
|
size_t
|
||||||
|
ck_pring_allocation_size(size_t n_consumer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a pring of n_consumer
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ck_pring_init(struct ck_pring *, size_t n_consumer, struct ck_pring_elt *, size_t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a pring for n consumers, with buf of bufsz elements.
|
||||||
|
*
|
||||||
|
* If consumers have dependencies, the caller is expected to set up
|
||||||
|
* the dependency begin/end half-open ranges: a consumer block with
|
||||||
|
* dependency [begin, end) will only process items after they have
|
||||||
|
* been consumed by consumer ids [begin, end).
|
||||||
|
*/
|
||||||
|
struct ck_pring *
|
||||||
|
ck_pring_create(struct ck_malloc *,
|
||||||
|
size_t n_consumer, struct ck_pring_elt *buf, size_t bufsz);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deallocate the pring *but not the buffer*.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ck_pring_destroy(struct ck_malloc *, struct ck_pring *);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_size(const struct ck_pring *);
|
||||||
|
|
||||||
|
struct ck_pring_elt *
|
||||||
|
ck_pring_buffer(const struct ck_pring *);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inline implementation.
|
||||||
|
*/
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_size(const struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
|
||||||
|
return 1 + ring->prod.mask;
|
||||||
|
}
|
||||||
|
#endif /* !CK_PRING_H */
|
@ -0,0 +1,138 @@
|
|||||||
|
#ifndef PRING_COMMON_H
|
||||||
|
#define PRING_COMMON_H
|
||||||
|
#include <ck_cc.h>
|
||||||
|
#include <ck_md.h>
|
||||||
|
#include <ck_pr.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
struct ck_pring_elt {
|
||||||
|
char *gen;
|
||||||
|
uintptr_t value;
|
||||||
|
} CK_CC_ALIGN(2 * sizeof(void *));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* State for producers.
|
||||||
|
*
|
||||||
|
* consumer_snap is a lower bound on the oldest consumer cursor.
|
||||||
|
* We must not write past consumer_snap + mask. It is monotonic
|
||||||
|
* in the SP case, best effort otherwise.
|
||||||
|
*
|
||||||
|
* cursor is a lower bound on the next empty element. It is exact
|
||||||
|
* (and thus monotonic) in the single-producer case, best effort for
|
||||||
|
* MP.
|
||||||
|
*/
|
||||||
|
struct ck_pring_producer {
|
||||||
|
uint64_t consumer_snap; /* <= cons.cursor, accessed by prods. */
|
||||||
|
uint64_t cursor; /* <= real prod.cursor, accessed by prods & snoops. */
|
||||||
|
struct ck_pring_elt *buf; /* write once */
|
||||||
|
uint64_t mask; /* write once */
|
||||||
|
size_t n_consumer;
|
||||||
|
uintptr_t dummy;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* State for consumers.
|
||||||
|
*
|
||||||
|
* cursor is exact (and thus monotonic), the index for the next value
|
||||||
|
* we wish to read. any sequence < cursor is safe to overwrite (which
|
||||||
|
* lets writers go up to (cursor - 1 + size) = cursor + mask).
|
||||||
|
*
|
||||||
|
* read_limit is a lower bound on the exclusive range that is safe to
|
||||||
|
* read (i.e., we can always read non-empty sequence < read_limit).
|
||||||
|
* Normal consumers, without consumer dependencies, mostly ignore this
|
||||||
|
* field (it is set to arbitrarily high values). Consumers with
|
||||||
|
* dependencies use this field to make sure they never overtake their
|
||||||
|
* parents: at any time, for any parent, read_limit <= parent.cursor.
|
||||||
|
* read_limit is monotonic for SC, best effort for MC.
|
||||||
|
*
|
||||||
|
* dependency_begin, end mark the half-open range of this consumer's
|
||||||
|
* parents.
|
||||||
|
*/
|
||||||
|
struct ck_pring_consumer {
|
||||||
|
const struct ck_pring_elt *buf; /* write-once. */
|
||||||
|
uint64_t mask; /* write-once. */
|
||||||
|
uint64_t cursor; /* written by consumers, read by all. */
|
||||||
|
uint64_t read_limit; /* private. cursor < read_limit. */
|
||||||
|
uint32_t dependency_begin; /* write once */
|
||||||
|
uint32_t dependency_end; /* write once */
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A snooper is a consumer hidden from the rest of the system.
|
||||||
|
* Instead of relying on cursor to protect reads, snoopers must check
|
||||||
|
* the generation counter on ring buffer elements.
|
||||||
|
*/
|
||||||
|
struct ck_pring_snooper {
|
||||||
|
struct ck_pring_consumer cons;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ck_pring_consumer_block {
|
||||||
|
struct ck_pring_consumer cons;
|
||||||
|
char padding[CK_MD_CACHELINE - sizeof(struct ck_pring_consumer)];
|
||||||
|
} CK_CC_CACHELINE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pack more consumer blocks immediately after ck_pring for
|
||||||
|
* multi-consumer allocations.
|
||||||
|
*/
|
||||||
|
struct ck_pring {
|
||||||
|
struct ck_pring_producer prod;
|
||||||
|
char padding[CK_MD_CACHELINE - sizeof(struct ck_pring_producer)];
|
||||||
|
struct ck_pring_consumer_block cons;
|
||||||
|
} CK_CC_CACHELINE;
|
||||||
|
|
||||||
|
#define CK_PRING_INIT(BUF) \
|
||||||
|
CK_PRING_INIT_((BUF), \
|
||||||
|
CK_PRING_PO2_(sizeof(BUF) / sizeof(struct ck_pring_elt)), \
|
||||||
|
1)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inline internals.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the data block for consumer index (< n_consumer).
|
||||||
|
*/
|
||||||
|
static CK_CC_FORCE_INLINE struct ck_pring_consumer *
|
||||||
|
ck_pring_consumer_by_id(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer_block *consumers = &ring->cons;
|
||||||
|
|
||||||
|
/* index should only be > 0 with a heap-allocated tail of consumers. */
|
||||||
|
return &consumers[index].cons;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the read limit for the consumer. If there are no
|
||||||
|
* dependencies, arranges updates to make sure it's called very
|
||||||
|
* rarely.
|
||||||
|
*
|
||||||
|
* Return the new capacity if it is strictly positive, 0 otherwise.
|
||||||
|
*/
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_consumer_update_limit(struct ck_pring_consumer *,
|
||||||
|
const struct ck_pring *);
|
||||||
|
|
||||||
|
#define CK_PRING_INIT_(BUF, SIZE, N_CONSUMER) \
|
||||||
|
{ \
|
||||||
|
.prod = { \
|
||||||
|
.consumer_snap = (SIZE), \
|
||||||
|
.cursor = (SIZE), \
|
||||||
|
.buf = (BUF), \
|
||||||
|
.mask = (SIZE) - 1, \
|
||||||
|
.n_consumer = (N_CONSUMER) \
|
||||||
|
}, \
|
||||||
|
.cons = { \
|
||||||
|
.cons = { \
|
||||||
|
.buf = (BUF), \
|
||||||
|
.mask = (SIZE) - 1, \
|
||||||
|
.cursor = (SIZE) \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
|
#define CK_PRING_PO2_(X) \
|
||||||
|
((X) * sizeof(char[2 * !!((X) > 0 && ((X) & ((X) - 1)) == 0) - 1]))
|
||||||
|
#endif /* !PRING_COMMON_H */
|
@ -0,0 +1,332 @@
|
|||||||
|
#ifndef PRING_DEQUEUE_H
|
||||||
|
#define PRING_DEQUEUE_H
|
||||||
|
#include <ck_pring/common.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Approximately how many entries are available for consumption. Only
|
||||||
|
* useful if dependencies are active.
|
||||||
|
*/
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_consume_capacity(struct ck_pring *, size_t index);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dequeue a value from a single-threaded consumer block.
|
||||||
|
*
|
||||||
|
* Return 0 if there is no new value in the pring.
|
||||||
|
*/
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_sdequeue(struct ck_pring *, size_t index);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read a value from a single-threaded consumer block.
|
||||||
|
*
|
||||||
|
* Return 0 if there is no new value in the pring.
|
||||||
|
*/
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_sread(struct ck_pring *, size_t index);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consume the last value returned by sread for a single-thread consumer block.
|
||||||
|
*/
|
||||||
|
static inline void
|
||||||
|
ck_pring_sconsume(struct ck_pring *, size_t index);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dequeue up to n values from a single-thread consumer block.
|
||||||
|
*
|
||||||
|
* Return the number of values read and written to dst.
|
||||||
|
*/
|
||||||
|
size_t
|
||||||
|
ck_pring_sdequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read up to n values from a single-thread consumer block.
|
||||||
|
*
|
||||||
|
* Return the number of values read and written to dst.
|
||||||
|
*/
|
||||||
|
size_t
|
||||||
|
ck_pring_sread_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consume the last n values returned by sread_n.
|
||||||
|
*/
|
||||||
|
static inline void
|
||||||
|
ck_pring_sconsume_n(struct ck_pring *, size_t index, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dequeue a value from a multi-consumer block.
|
||||||
|
*
|
||||||
|
* Return 0 if there is no new value.
|
||||||
|
*/
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mdequeue(struct ck_pring *, size_t index);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mtrydequeue(struct ck_pring *, size_t index);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mread(struct ck_pring *, size_t index, uint64_t *OUT_gen);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mtryread(struct ck_pring *, size_t index, uint64_t *OUT_gen);
|
||||||
|
|
||||||
|
static inline bool
|
||||||
|
ck_pring_mconsume(struct ck_pring *, size_t index, uint64_t gen);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_mdequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_mtrydequeue_n(struct ck_pring *, size_t index, uintptr_t *dst, size_t n);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_mread_n(struct ck_pring *, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, uint64_t *OUT_gen);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_mtryread_n(struct ck_pring *, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, uint64_t *OUT_gen);
|
||||||
|
|
||||||
|
static inline bool
|
||||||
|
ck_pring_mconsume_n(struct ck_pring *, size_t index, uint64_t gen, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inline implementation.
|
||||||
|
*/
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_consume_capacity(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
uint64_t cursor = cons->cursor;
|
||||||
|
uint64_t limit = cons->read_limit;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - limit) >= 0)) {
|
||||||
|
return ck_pring_consumer_update_limit(cons, ring);
|
||||||
|
}
|
||||||
|
|
||||||
|
return limit - cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t ck_pring_sdequeue_slow(struct ck_pring *, size_t);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_sdequeue(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t cursor = cons->cursor; /* only writer is us. */
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - cons->read_limit) >= 0)) {
|
||||||
|
/*
|
||||||
|
* This only happens with dependencies, when consumers
|
||||||
|
* catch up to the parents. This will always be
|
||||||
|
* complex, and I don't feel bad about slowing down
|
||||||
|
* consumers that are going too fast.
|
||||||
|
*/
|
||||||
|
return ck_pring_sdequeue_slow(ring, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We know where the next element we wish to consume lives.
|
||||||
|
* Load its generation *before* its value. If the generation
|
||||||
|
* matches our cursor, consume the element and return the
|
||||||
|
* value.
|
||||||
|
*/
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
#ifdef assert
|
||||||
|
assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 &&
|
||||||
|
"Concurrent dequeue in sdequeue?");
|
||||||
|
#endif
|
||||||
|
ck_pr_fence_load();
|
||||||
|
/* read gen before value. cursor is always an upper bound on gen. */
|
||||||
|
snap.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
if (CK_CC_UNLIKELY((uint64_t)snap.gen != cursor)) {
|
||||||
|
/*
|
||||||
|
* This will tend to be either always false (normal
|
||||||
|
* operations) or always true (queue is empty); a cmov
|
||||||
|
* would be easy but would be slightly slower than a
|
||||||
|
* predictable branch in both cases.
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_load_store();
|
||||||
|
/*
|
||||||
|
* Producers will read cons.cursor. Make sure to consume the
|
||||||
|
* cell's value before releasing, and make sure cursor is safe
|
||||||
|
* to read by other threads.
|
||||||
|
*/
|
||||||
|
ck_pr_store_64(&cons->cursor, cursor + 1);
|
||||||
|
return snap.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t ck_pring_sread_slow(struct ck_pring *, size_t);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_sread(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t cursor = cons->cursor;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - cons->read_limit) >= 0)) {
|
||||||
|
return ck_pring_sread_slow(ring, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
#ifdef assert
|
||||||
|
assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 &&
|
||||||
|
"Concurrent dequeue in sread?");
|
||||||
|
#endif
|
||||||
|
ck_pr_fence_load();
|
||||||
|
snap.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
return ((uint64_t)snap.gen == cursor) ? snap.value : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void
|
||||||
|
ck_pring_sconsume(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
|
||||||
|
ck_pring_sconsume_n(ring, index, 1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void
|
||||||
|
ck_pring_sconsume_n(struct ck_pring *ring, size_t index, size_t n)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
|
||||||
|
ck_pr_fence_load_store();
|
||||||
|
ck_pr_store_64(&cons->cursor, ring->cons.cons.cursor + n);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_mdequeue_generic(struct ck_pring *ring, size_t index, bool hard);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mdequeue(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mdequeue_generic(ring, index, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mtrydequeue(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mdequeue_generic(ring, index, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t ck_pring_mread_slow(struct ck_pring *, size_t, uint64_t *, bool);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mread_generic(struct ck_pring *ring, size_t index,
|
||||||
|
uint64_t *OUT_gen, bool hard)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t cursor = ck_pr_load_64(&cons->cursor);
|
||||||
|
uint64_t read_limit = ck_pr_load_64(&cons->read_limit);
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) {
|
||||||
|
goto slow;
|
||||||
|
}
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
snap.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)((uint64_t)snap.gen - cursor) < 0)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_load();
|
||||||
|
if (CK_CC_UNLIKELY((uint64_t)ck_pr_load_ptr(&buf[loc].gen) != cursor)) {
|
||||||
|
goto slow;
|
||||||
|
}
|
||||||
|
|
||||||
|
return snap.value;
|
||||||
|
|
||||||
|
slow:
|
||||||
|
return ck_pring_mread_slow(ring, index, OUT_gen, hard);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mread(struct ck_pring *ring, size_t index, uint64_t *OUT_gen)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mread_generic(ring, index, OUT_gen, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_mtryread(struct ck_pring *ring, size_t index, uint64_t *OUT_gen)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mread_generic(ring, index, OUT_gen, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool
|
||||||
|
ck_pring_mconsume(struct ck_pring *ring, size_t index, uint64_t gen)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mconsume_n(ring, index, gen, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_mdequeue_n_generic(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, bool hard);
|
||||||
|
|
||||||
|
inline size_t
|
||||||
|
ck_pring_mdequeue_n(struct ck_pring *ring, size_t index, uintptr_t *dst, size_t n)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mdequeue_n_generic(ring, index, dst, n, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline size_t
|
||||||
|
ck_pring_mtrydequeue_n(struct ck_pring *ring, size_t index, uintptr_t *dst, size_t n)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mdequeue_n_generic(ring, index, dst, n, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool
|
||||||
|
ck_pring_mconsume_n(struct ck_pring *ring, size_t index, uint64_t gen, size_t n)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
|
||||||
|
ck_pr_fence_load_store();
|
||||||
|
return ck_pr_cas_64(&cons->cursor, gen, gen + n);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_mread_n_generic(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, uint64_t *OUT_gen, bool hard);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_mread_n(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, uint64_t *OUT_gen)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mread_n_generic(ring, index,
|
||||||
|
dst, n, OUT_gen, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_mtryread_n(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, uint64_t *OUT_gen)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_mread_n_generic(ring, index,
|
||||||
|
dst, n, OUT_gen, false);
|
||||||
|
}
|
||||||
|
#endif /* !PRING_DEQUEUE_H */
|
@ -0,0 +1,143 @@
|
|||||||
|
#ifndef PRING_ENQUEUE_H
|
||||||
|
#define PRING_ENQUEUE_H
|
||||||
|
#include <ck_pring/common.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an approximation of the remaining capacity in the ring.
|
||||||
|
*
|
||||||
|
* Exact for single producer.
|
||||||
|
*/
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_enqueue_capacity(struct ck_pring *);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to enqueue one value in a single-producer pring.
|
||||||
|
*
|
||||||
|
* Return true iff success.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
ck_pring_senqueue(struct ck_pring *, uintptr_t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to enqueue one value in a single-producer pring.
|
||||||
|
*
|
||||||
|
* Return true iff success, in which case old_value was overwritten
|
||||||
|
* with the value previously in the ring buffer element. The value
|
||||||
|
* is unspecified on failure.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
ck_pring_senqueue_val(struct ck_pring *, uintptr_t, uintptr_t *old_value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to enqueue up to n values in a single-producer pring.
|
||||||
|
*
|
||||||
|
* Return the number of values enqueued; values[0 .. ret) is updated
|
||||||
|
* with the value previously in the ring buffer elements.
|
||||||
|
*/
|
||||||
|
size_t
|
||||||
|
ck_pring_senqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to enqueue one value in a multi-producer pring.
|
||||||
|
*
|
||||||
|
* Return true iff success.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
ck_pring_menqueue(struct ck_pring *, uintptr_t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to enqueue one value in a multi-producer pring.
|
||||||
|
*
|
||||||
|
* Return true iff success, in which case old_value was overwritten
|
||||||
|
* with the value previously in the ring buffer element. The value
|
||||||
|
* is unspecified on failure.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ck_pring_menqueue_val(struct ck_pring *, uintptr_t, uintptr_t *old_value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to enqueue up to n values in a single-producer pring.
|
||||||
|
*
|
||||||
|
* Return the number of values enqueued; values[0 .. ret) is updated
|
||||||
|
* with the value previously in the ring buffer elements.
|
||||||
|
*/
|
||||||
|
size_t
|
||||||
|
ck_pring_menqueue_n(struct ck_pring *, uintptr_t *values, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inline implementation.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
ck_pring_menqueue(struct ck_pring *ring, uintptr_t value)
|
||||||
|
{
|
||||||
|
uintptr_t buf;
|
||||||
|
|
||||||
|
(void)buf;
|
||||||
|
return ck_pring_menqueue_val(ring, value, &buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_enqueue_capacity_slow(struct ck_pring *ring);
|
||||||
|
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_enqueue_capacity(struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
uint64_t mask = ring->prod.mask;
|
||||||
|
uint64_t consumer_snap = ring->prod.consumer_snap;
|
||||||
|
uint64_t cursor = ring->prod.cursor;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY(cursor - consumer_snap > mask)) {
|
||||||
|
return ck_pring_enqueue_capacity_slow(ring);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (consumer_snap + mask + 1) - cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool
|
||||||
|
ck_pring_senqueue(struct ck_pring *ring, uintptr_t value)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ck_pring_senqueue_val(ring, value, &ring->prod.dummy);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
ck_pring_senqueue_val_slow(struct ck_pring *ring, uintptr_t value,
|
||||||
|
uintptr_t *old_value);
|
||||||
|
|
||||||
|
static inline bool
|
||||||
|
ck_pring_senqueue_val(struct ck_pring *ring, uintptr_t value,
|
||||||
|
uintptr_t *old_value)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt *buf = ring->prod.buf;
|
||||||
|
struct ck_pring_elt *dst;
|
||||||
|
uint64_t mask = ring->prod.mask;
|
||||||
|
/* only writer to prod.* is us. */
|
||||||
|
uint64_t consumer_snap = ring->prod.consumer_snap;
|
||||||
|
uint64_t cursor = ring->prod.cursor;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We know where we want to write. Make sure our snapshot of
|
||||||
|
* the consumer cursor lets us write there (or update the
|
||||||
|
* snapshot), and write the value *before* publishing
|
||||||
|
* the new generation.
|
||||||
|
*/
|
||||||
|
dst = &buf[loc];
|
||||||
|
#ifdef __GNUC__
|
||||||
|
__asm__("" : "+r"(dst)); /* compute dst before the branch. */
|
||||||
|
#endif
|
||||||
|
if (CK_CC_UNLIKELY((cursor - consumer_snap) > mask)) {
|
||||||
|
return ck_pring_senqueue_val_slow(ring, value, old_value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* We're not too far. do the write! */
|
||||||
|
*old_value = dst->value;
|
||||||
|
ck_pr_store_ptr((void **)&dst->value, (void *)value);
|
||||||
|
ck_pr_fence_store();
|
||||||
|
ck_pr_store_ptr(&dst->gen, (void *)cursor);
|
||||||
|
|
||||||
|
ck_pr_fence_store();
|
||||||
|
ck_pr_store_64(&ring->prod.cursor, cursor + 1);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
#endif /* !PRING_ENQUEUE_H */
|
@ -0,0 +1,93 @@
|
|||||||
|
#ifndef PRING_SNOOP_H
|
||||||
|
#define PRING_SNOOP_H
|
||||||
|
#include <ck_pring/common.h>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a snooper block. Snoopers are consumers that do not
|
||||||
|
* block producers. dep_begin, dep_end specifies a range of consumer
|
||||||
|
* ids not to overtake.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ck_pring_snoop_init(struct ck_pring_snooper *, const struct ck_pring *,
|
||||||
|
uint32_t dep_begin, uint32_t dep_end);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Approximately how many entries are available for snooping. Only
|
||||||
|
* useful if dependencies are active.
|
||||||
|
*/
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_snoop_capacity(struct ck_pring_snooper *, const struct ck_pring *);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Snoop the next value from the pring.
|
||||||
|
*
|
||||||
|
* Return 0 on failure.
|
||||||
|
*/
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_snoop(struct ck_pring_snooper *, const struct ck_pring *);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Snoop up to n values from the pring.
|
||||||
|
*
|
||||||
|
* Return the number of values snooped and written to dst.
|
||||||
|
*/
|
||||||
|
size_t
|
||||||
|
ck_pring_snoop_n(struct ck_pring_snooper *, const struct ck_pring *,
|
||||||
|
uintptr_t *dst, size_t n);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inline implementation.
|
||||||
|
*/
|
||||||
|
static inline size_t
|
||||||
|
ck_pring_snoop_capacity(struct ck_pring_snooper *snoop,
|
||||||
|
const struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
uint64_t cursor = snoop->cons.cursor;
|
||||||
|
uint64_t limit = snoop->cons.read_limit;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - limit) >= 0)) {
|
||||||
|
return ck_pring_consumer_update_limit(&snoop->cons, ring);
|
||||||
|
}
|
||||||
|
|
||||||
|
return limit - cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_snoop_slow(struct ck_pring_snooper *snoop,
|
||||||
|
const struct ck_pring *ring);
|
||||||
|
|
||||||
|
static inline uintptr_t
|
||||||
|
ck_pring_snoop(struct ck_pring_snooper *snoop, const struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
const struct ck_pring_elt *buf = snoop->cons.buf;
|
||||||
|
uint64_t mask = snoop->cons.mask;
|
||||||
|
uint64_t cursor = snoop->cons.cursor;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - snoop->cons.read_limit) >= 0)) {
|
||||||
|
goto slow;
|
||||||
|
}
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
/* read gen before value. cursor is an upper bound on gen. */
|
||||||
|
snap.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)((uint64_t)snap.gen - cursor) < 0)) {
|
||||||
|
/* gen is too old. queue is still empty. */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_load();
|
||||||
|
if (CK_CC_UNLIKELY((uint64_t)ck_pr_load_ptr(&buf[loc].gen) != cursor)) {
|
||||||
|
/* gen doesn't match and/or cursor is out of date; try again. */
|
||||||
|
goto slow;
|
||||||
|
}
|
||||||
|
|
||||||
|
snoop->cons.cursor = cursor + 1;
|
||||||
|
return snap.value;
|
||||||
|
|
||||||
|
slow:
|
||||||
|
return ck_pring_snoop_slow(snoop, ring);
|
||||||
|
}
|
||||||
|
#endif /* !PRING_SNOOP_H */
|
@ -0,0 +1,109 @@
|
|||||||
|
#define CK_PRING_IMPL
|
||||||
|
#include <assert.h>
|
||||||
|
#include <ck_pring.h>
|
||||||
|
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_allocation_size(size_t n_consumer)
|
||||||
|
{
|
||||||
|
size_t consumer_size;
|
||||||
|
size_t extra = 0;
|
||||||
|
|
||||||
|
if (n_consumer > 0) {
|
||||||
|
extra = n_consumer - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer_size = extra * sizeof(struct ck_pring_consumer_block);
|
||||||
|
return sizeof(struct ck_pring) + consumer_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ck_pring_init(struct ck_pring *ring, size_t n_consumer,
|
||||||
|
struct ck_pring_elt *buf, size_t size)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer_block *next = (void *)(ring + 1);
|
||||||
|
|
||||||
|
assert(size > 0 && (size & (size - 1)) == 0);
|
||||||
|
*ring = (struct ck_pring) CK_PRING_INIT_(buf, size, n_consumer);
|
||||||
|
for (size_t i = 1; i < n_consumer; i++) {
|
||||||
|
next[i - 1].cons = ring->cons.cons;
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_store();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ck_pring *
|
||||||
|
ck_pring_create(struct ck_malloc *ck_malloc,
|
||||||
|
size_t n_consumer, struct ck_pring_elt *buf, size_t bufsz)
|
||||||
|
{
|
||||||
|
struct ck_pring *ret;
|
||||||
|
size_t ringsz;
|
||||||
|
|
||||||
|
ringsz = ck_pring_allocation_size(n_consumer);
|
||||||
|
ret = ck_malloc->malloc(ringsz);
|
||||||
|
memset(ret, 0, ringsz);
|
||||||
|
ck_pring_init(ret, n_consumer, buf, bufsz);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ck_pring_destroy(struct ck_malloc *ck_malloc, struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
size_t ringsz;
|
||||||
|
|
||||||
|
if (ring == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ringsz = ck_pring_allocation_size(ring->prod.n_consumer);
|
||||||
|
ck_malloc->free(ring, ringsz, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ck_pring_elt *
|
||||||
|
ck_pring_buffer(const struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
|
||||||
|
return ring->prod.buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Declared in pring_common.h */
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_consumer_update_limit(struct ck_pring_consumer *consumer,
|
||||||
|
const struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
const struct ck_pring_consumer_block *consumers = &ring->cons;
|
||||||
|
uint64_t old_limit = ck_pr_load_64(&consumer->read_limit);
|
||||||
|
uint64_t limit = old_limit + (1UL << 60);
|
||||||
|
uint64_t capacity;
|
||||||
|
size_t dep_begin = consumer->dependency_begin;
|
||||||
|
size_t dep_end = consumer->dependency_end;
|
||||||
|
|
||||||
|
/* Common case: no dependency. */
|
||||||
|
if (CK_CC_LIKELY(dep_begin >= dep_end)) {
|
||||||
|
capacity = consumer->mask + 1;
|
||||||
|
ck_pr_store_64(&consumer->read_limit, limit);
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = dep_end; i --> dep_begin; ) {
|
||||||
|
const struct ck_pring_consumer *current = &consumers[i].cons;
|
||||||
|
uint64_t current_cursor;
|
||||||
|
size_t begin = current->dependency_begin;
|
||||||
|
size_t skip;
|
||||||
|
|
||||||
|
current_cursor = ck_pr_load_64(¤t->cursor);
|
||||||
|
skip = (current->dependency_end >= i) ? begin : i;
|
||||||
|
if ((int64_t)(current_cursor - limit) < 0) {
|
||||||
|
limit = current_cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
i = (skip < i) ? skip : i;
|
||||||
|
}
|
||||||
|
|
||||||
|
capacity = limit - ck_pr_load_64(&consumer->cursor);
|
||||||
|
ck_pr_store_64(&consumer->read_limit, limit);
|
||||||
|
return ((int64_t)capacity > 0) ? capacity : 0;
|
||||||
|
}
|
@ -0,0 +1,312 @@
|
|||||||
|
#include <assert.h>
|
||||||
|
#include <ck_pring/dequeue.h>
|
||||||
|
|
||||||
|
#include <sys/types.h>
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_sdequeue_slow(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
uintptr_t buf[1] = { 0 };
|
||||||
|
uintptr_t n;
|
||||||
|
|
||||||
|
n = ck_pring_sdequeue_n(ring, index, buf, 1);
|
||||||
|
return buf[0] & -n;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_sread_slow(struct ck_pring *ring, size_t index)
|
||||||
|
{
|
||||||
|
uintptr_t buf[1] = { 0 };
|
||||||
|
uintptr_t n;
|
||||||
|
|
||||||
|
n = ck_pring_sread_n(ring, index, buf, 1);
|
||||||
|
return buf[0] & -n;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_sdequeue_n(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n)
|
||||||
|
{
|
||||||
|
size_t read;
|
||||||
|
|
||||||
|
read = ck_pring_sread_n(ring, index, dst, n);
|
||||||
|
ck_pring_sconsume_n(ring, index, read);
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_sread_n(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t base_cursor = cons->cursor; /* only writer is us. */
|
||||||
|
uint64_t read_limit = ck_pr_load_64(&cons->read_limit);
|
||||||
|
size_t capacity = read_limit - base_cursor;
|
||||||
|
size_t consumed;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(base_cursor - read_limit) >= 0)) {
|
||||||
|
capacity = ck_pring_consumer_update_limit(cons, ring);
|
||||||
|
if (capacity == 0) {
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n = (n > capacity) ? capacity : n;
|
||||||
|
/*
|
||||||
|
* No check for n == 0. This situation should be rare, and
|
||||||
|
* the code below correctly handles it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* See if we can immediately read n values. We know values
|
||||||
|
* from base_cursor onward have not been overwritten. We only
|
||||||
|
* have to check if the last item we wish to read has been
|
||||||
|
* produced before copying everything.
|
||||||
|
*/
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
uint64_t last_cursor = base_cursor + n - 1;
|
||||||
|
size_t last_loc = last_cursor & mask;
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[last_loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
if (CK_CC_UNLIKELY((uint64_t)snap.gen != last_cursor)) {
|
||||||
|
goto slow;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < n; i++) {
|
||||||
|
uint64_t cursor = base_cursor + i;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
dst[i] = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
slow:
|
||||||
|
for (consumed = 0; consumed < n; consumed++) {
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
uint64_t cursor = base_cursor + consumed;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
snap.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
if ((uint64_t)snap.gen != cursor) {
|
||||||
|
assert((int64_t)((uint64_t)snap.gen - cursor) <= 0 &&
|
||||||
|
"Concurrent dequeue in sdequeue?");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
dst[consumed] = snap.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
return consumed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update snap with the value for cursor in the ring buffer.
|
||||||
|
* Returns a comparator value for generation ?= cursor.
|
||||||
|
* 0 means we have the value we're looking for.
|
||||||
|
* Negative means the value is older than expected (the cell
|
||||||
|
* is empty).
|
||||||
|
* Positive means the value is younger than expected (we
|
||||||
|
* were too slow and lost the race).
|
||||||
|
*/
|
||||||
|
static inline int64_t
|
||||||
|
preacquire(struct ck_pring_elt *snap,
|
||||||
|
const struct ck_pring_elt *buf, uint64_t mask, uint64_t cursor)
|
||||||
|
{
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
snap->gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
snap->value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
|
||||||
|
return (int64_t)((uint64_t)snap->gen - cursor);
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_mdequeue_generic(struct ck_pring *ring, size_t index, bool hard)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t cursor = ck_pr_load_64(&cons->cursor);
|
||||||
|
uint64_t read_limit = ck_pr_load_64(&cons->read_limit);
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) {
|
||||||
|
uintptr_t ret;
|
||||||
|
|
||||||
|
ret = ck_pring_consumer_update_limit(cons, ring);
|
||||||
|
if (ret == 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Fast path, assuming our cursor is up to date. */
|
||||||
|
{
|
||||||
|
int64_t ret;
|
||||||
|
|
||||||
|
ret = preacquire(&snap, buf, mask, cursor);
|
||||||
|
if (CK_CC_LIKELY(ret == 0)) {
|
||||||
|
/*
|
||||||
|
* The ring buffer element is up to date.
|
||||||
|
* Attempt to acquire it!
|
||||||
|
*/
|
||||||
|
if (CK_CC_LIKELY(ck_pr_cas_64(&cons->cursor,
|
||||||
|
cursor, cursor + 1))) {
|
||||||
|
return snap.value;
|
||||||
|
}
|
||||||
|
} else if (CK_CC_LIKELY(ret < 0)) {
|
||||||
|
/*
|
||||||
|
* The ring buffer element is too old (still
|
||||||
|
* empty). Fail immediately.
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
uintptr_t arr[1] = { 0 };
|
||||||
|
uintptr_t n;
|
||||||
|
|
||||||
|
n = ck_pring_mdequeue_n_generic(ring, index, arr, 1, hard);
|
||||||
|
return arr[0] & -n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_mread_slow(struct ck_pring *ring, size_t index,
|
||||||
|
uint64_t *OUT_gen, bool hard)
|
||||||
|
{
|
||||||
|
uintptr_t buf[1];
|
||||||
|
uintptr_t n;
|
||||||
|
|
||||||
|
n = ck_pring_mread_n_generic(ring, index, buf, 1, OUT_gen, hard);
|
||||||
|
return buf[0] & -n;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_mdequeue_n_generic(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, bool hard)
|
||||||
|
{
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
uint64_t gen;
|
||||||
|
size_t ret;
|
||||||
|
|
||||||
|
ret = ck_pring_mread_n_generic(ring, index, dst, n, &gen, hard);
|
||||||
|
if (ret == 0 || ck_pring_mconsume_n(ring, index, gen, ret)) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hard) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
n = (n + 1) / 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_mread_n_generic(struct ck_pring *ring, size_t index,
|
||||||
|
uintptr_t *dst, size_t n, uint64_t *OUT_gen, bool hard)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer *cons = ck_pring_consumer_by_id(ring, index);
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t base_cursor;
|
||||||
|
uint64_t read_limit;
|
||||||
|
size_t base_loc;
|
||||||
|
size_t capacity;
|
||||||
|
size_t consumed;
|
||||||
|
|
||||||
|
retry:
|
||||||
|
base_cursor = ck_pr_load_64(&cons->cursor);
|
||||||
|
read_limit = ck_pr_load_64(&cons->read_limit);
|
||||||
|
base_loc = base_cursor & mask;
|
||||||
|
capacity = read_limit - base_cursor;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(base_cursor - read_limit) >= 0)) {
|
||||||
|
capacity = ck_pring_consumer_update_limit(cons, ring);
|
||||||
|
if (capacity == 0) {
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n = (n > capacity) ? capacity : n;
|
||||||
|
*OUT_gen = base_cursor;
|
||||||
|
|
||||||
|
{
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
uint64_t last_cursor = base_cursor + n - 1;
|
||||||
|
size_t last_loc = last_cursor & mask;
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[last_loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
if (CK_CC_UNLIKELY((uint64_t)snap.gen != last_cursor)) {
|
||||||
|
goto slow;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < n; i++) {
|
||||||
|
uint64_t cursor = base_cursor + i;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
dst[i] = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_load();
|
||||||
|
if (n <= 1 ||
|
||||||
|
(uint64_t)ck_pr_load_ptr(&buf[base_loc].gen) == base_cursor) {
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hard) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* We started with snap.gen == last_cursor, so we lost a race. */
|
||||||
|
n = 1;
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
slow:
|
||||||
|
if (n == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (consumed = 0; consumed < n; consumed++) {
|
||||||
|
struct ck_pring_elt snap;
|
||||||
|
uint64_t cursor = base_cursor + consumed;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
snap.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
snap.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
if ((uint64_t)snap.gen != cursor) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
dst[consumed] = snap.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (consumed == 0 && hard) {
|
||||||
|
uint64_t gen;
|
||||||
|
|
||||||
|
gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen);
|
||||||
|
/* Only retry if we lost the race. */
|
||||||
|
if ((int64_t)(gen - base_cursor) > 0) {
|
||||||
|
n = 1;
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return consumed;
|
||||||
|
}
|
@ -0,0 +1,314 @@
|
|||||||
|
#include <assert.h>
|
||||||
|
#include <ck_pring/enqueue.h>
|
||||||
|
|
||||||
|
#include <ck_cc.h>
|
||||||
|
#include <ck_pr.h>
|
||||||
|
#include <ck_limits.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
|
||||||
|
static uint64_t
|
||||||
|
oldest_consumer_snap(const struct ck_pring *ring, uint64_t cursor)
|
||||||
|
{
|
||||||
|
const struct ck_pring_consumer_block *consumers = &ring->cons;
|
||||||
|
size_t n_consumer = ring->prod.n_consumer;
|
||||||
|
uint64_t ret = cursor;
|
||||||
|
|
||||||
|
for (size_t i = n_consumer; i --> 0; ) {
|
||||||
|
const struct ck_pring_consumer *current = &consumers[i].cons;
|
||||||
|
uint64_t current_cursor = ck_pr_load_64(¤t->cursor);
|
||||||
|
size_t skip = current->dependency_begin;
|
||||||
|
|
||||||
|
if ((int64_t)(current_cursor - ret) < 0) {
|
||||||
|
ret = current_cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Current cursor includes [begin, end). If end >= i,
|
||||||
|
* we may skip everything down to and including begin:
|
||||||
|
* all skipped indices are covered by the current
|
||||||
|
* cursor.
|
||||||
|
*/
|
||||||
|
i = (current->dependency_end >= i) ? skip : i;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_enqueue_capacity_slow(struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
uint64_t mask = ring->prod.mask;
|
||||||
|
uint64_t consumer_snap;
|
||||||
|
uint64_t cursor = ring->prod.cursor;
|
||||||
|
|
||||||
|
consumer_snap = oldest_consumer_snap(ring, cursor);
|
||||||
|
ck_pr_store_64(&ring->prod.consumer_snap, consumer_snap);
|
||||||
|
if (cursor - consumer_snap > mask) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (consumer_snap + mask + 1) - cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
ck_pring_senqueue_val_slow(struct ck_pring *ring, uintptr_t value,
|
||||||
|
uintptr_t *old_value)
|
||||||
|
{
|
||||||
|
uint64_t mask = ring->prod.mask;
|
||||||
|
uint64_t consumer_snap;
|
||||||
|
uint64_t cursor = ring->prod.cursor;
|
||||||
|
|
||||||
|
consumer_snap = oldest_consumer_snap(ring, cursor);
|
||||||
|
ring->prod.consumer_snap = consumer_snap;
|
||||||
|
if (cursor - consumer_snap > mask) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ck_pring_senqueue_val(ring, value, old_value);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_senqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt *buf = ring->prod.buf;
|
||||||
|
uint64_t mask = ring->prod.mask;
|
||||||
|
uint64_t base_cursor = ring->prod.cursor;
|
||||||
|
size_t capacity;
|
||||||
|
size_t produced;
|
||||||
|
|
||||||
|
capacity = ck_pring_enqueue_capacity(ring);
|
||||||
|
if (n > capacity) {
|
||||||
|
n = capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (produced = 0; produced < n; produced++) {
|
||||||
|
struct ck_pring_elt *dst;
|
||||||
|
uint64_t cursor = base_cursor + produced;
|
||||||
|
uintptr_t previous;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
dst = &buf[loc];
|
||||||
|
previous = dst->value;
|
||||||
|
ck_pr_store_ptr((void **)&dst->value, (void *)values[produced]);
|
||||||
|
ck_pr_fence_store();
|
||||||
|
ck_pr_store_ptr(&dst->gen, (void *)cursor);
|
||||||
|
|
||||||
|
values[produced] = previous;
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_store();
|
||||||
|
ck_pr_store_64(&ring->prod.cursor, base_cursor + produced);
|
||||||
|
return produced;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assuming that cursor is safe to overwrite (i.e., is at most mask
|
||||||
|
* ahead of the read-side cursor), attempt to overwrite an *older*
|
||||||
|
* record with our new value.
|
||||||
|
*
|
||||||
|
* Ret < 0 means success.
|
||||||
|
* Ret == 0 means we lost the race to update cursor.
|
||||||
|
* Ret > 0 means we lost the race by (at least) a full revolution
|
||||||
|
* around the ring buffer.
|
||||||
|
*/
|
||||||
|
static int64_t
|
||||||
|
try_menqueue_one(struct ck_pring_producer *snap,
|
||||||
|
uintptr_t value,
|
||||||
|
uint64_t cursor,
|
||||||
|
uintptr_t *old)
|
||||||
|
{
|
||||||
|
struct ck_pring_elt expected, update;
|
||||||
|
struct ck_pring_elt *buf = snap->buf;
|
||||||
|
uint64_t actual_gen;
|
||||||
|
uint64_t mask = snap->mask;
|
||||||
|
uint64_t loc = cursor & mask;
|
||||||
|
int64_t ret;
|
||||||
|
|
||||||
|
expected.value = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
/* no barrier here: the CAS will just fail. */
|
||||||
|
expected.gen = ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
actual_gen = (uint64_t)expected.gen;
|
||||||
|
|
||||||
|
ret = (int64_t)(actual_gen - cursor);
|
||||||
|
if (ret >= 0) {
|
||||||
|
/* we're trying to replace a fresh record. fail. */
|
||||||
|
goto late;
|
||||||
|
}
|
||||||
|
|
||||||
|
update.value = value;
|
||||||
|
update.gen = (void *)cursor;
|
||||||
|
if (CK_CC_LIKELY(ck_pr_cas_ptr_2_value(&buf[loc], &expected, &update,
|
||||||
|
&expected))) {
|
||||||
|
*old = expected.value;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* if we failed, try again. the dwcas gave us a consistent
|
||||||
|
* read, so no spurious failure here.
|
||||||
|
*/
|
||||||
|
actual_gen = (uint64_t)expected.gen;
|
||||||
|
ret = (int64_t)(actual_gen - cursor);
|
||||||
|
if (ret >= 0) {
|
||||||
|
goto late;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ck_pr_cas_ptr_2_value(&buf[loc], &expected, &update,
|
||||||
|
&expected)) {
|
||||||
|
*old = expected.value;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
actual_gen = (uint64_t)expected.gen;
|
||||||
|
ret = 0;
|
||||||
|
|
||||||
|
late:
|
||||||
|
/*
|
||||||
|
* If we're late, we know the next "free" generation value is
|
||||||
|
* at least one more than the one we just observed.
|
||||||
|
*/
|
||||||
|
snap->cursor = actual_gen + 1;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bounded linear search for an empty cell, up to snap->cons.consumer_smap + mask;
|
||||||
|
* Return true on success, false otherwise.
|
||||||
|
* Update the snapshot's (write) cursor on failure.
|
||||||
|
* Update the producer-side cache on success.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
try_menqueue(struct ck_pring *ring,
|
||||||
|
struct ck_pring_producer *snap,
|
||||||
|
uintptr_t value,
|
||||||
|
uintptr_t *old)
|
||||||
|
{
|
||||||
|
uint64_t consumer_snap = snap->consumer_snap;
|
||||||
|
uint64_t cursor = snap->cursor;
|
||||||
|
uint64_t mask = snap->mask;
|
||||||
|
|
||||||
|
if ((int64_t)(cursor - consumer_snap) < 0) {
|
||||||
|
cursor = consumer_snap;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; (cursor - consumer_snap) <= mask; cursor++) {
|
||||||
|
int64_t ret;
|
||||||
|
|
||||||
|
ret = try_menqueue_one(snap, value, cursor, old);
|
||||||
|
if (ret > 0) {
|
||||||
|
/*
|
||||||
|
* we're really off. break out of here and
|
||||||
|
* update our snapshots. try_menqueue_one
|
||||||
|
* already updated snap->cursor.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Success!
|
||||||
|
*/
|
||||||
|
if (ret < 0) {
|
||||||
|
ck_pr_store_64(&ring->prod.cursor, cursor + 1);
|
||||||
|
ck_pr_store_64(&ring->prod.consumer_snap,
|
||||||
|
snap->consumer_snap);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
snap->cursor = cursor;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
ck_pring_menqueue_val(struct ck_pring *ring, uintptr_t value,
|
||||||
|
uintptr_t *old)
|
||||||
|
{
|
||||||
|
struct ck_pring_producer snap;
|
||||||
|
|
||||||
|
snap.buf = ring->prod.buf;
|
||||||
|
snap.mask = ring->prod.mask;
|
||||||
|
snap.consumer_snap = ck_pr_load_64(&ring->prod.consumer_snap);
|
||||||
|
snap.cursor = ck_pr_load_64(&ring->prod.cursor);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fast path: snap.cursor isn't too far ahead. Immediately
|
||||||
|
* try to write there.
|
||||||
|
* We only access the producers' struct and the buffer.
|
||||||
|
*/
|
||||||
|
if (CK_CC_LIKELY((snap.cursor - snap.consumer_snap) <= snap.mask)) {
|
||||||
|
if (CK_CC_LIKELY(
|
||||||
|
try_menqueue_one(&snap, value, snap.cursor, old) < 0)) {
|
||||||
|
/* Success: racily update our local cursor and win. */
|
||||||
|
ck_pr_store_64(&ring->prod.cursor, snap.cursor + 1);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Slow path: update our private snapshot from the producers'
|
||||||
|
* and consumers' structs, until the consumers' cursor stops
|
||||||
|
* moving (if that happens, it's really time to fail).
|
||||||
|
*/
|
||||||
|
for (;;) {
|
||||||
|
uint64_t consumer_snap;
|
||||||
|
uint64_t prod_cursor;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Linear search for an empty cell that's OK to
|
||||||
|
* overwrite. On success, nothing else to do:
|
||||||
|
* try_menqueue updates the producers' cache.
|
||||||
|
*/
|
||||||
|
if (try_menqueue(ring, &snap, value, old)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
prod_cursor = ck_pr_load_64(&ring->prod.cursor);
|
||||||
|
/*
|
||||||
|
* prod.cursor is a racy hint. Either update
|
||||||
|
* our cache or move the global cursor ahead.
|
||||||
|
*/
|
||||||
|
if ((int64_t)(prod_cursor - snap.cursor) > 0) {
|
||||||
|
snap.cursor = prod_cursor;
|
||||||
|
} else {
|
||||||
|
ck_pr_store_64(&ring->prod.cursor, snap.cursor);
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer_snap = oldest_consumer_snap(ring, snap.cursor);
|
||||||
|
/* No progress on the consumer's end. Stop trying.*/
|
||||||
|
if (consumer_snap == snap.consumer_snap) {
|
||||||
|
uint64_t current_snap;
|
||||||
|
|
||||||
|
/* Update the global snap if it's older than ours. */
|
||||||
|
current_snap = ck_pr_load_64(&ring->prod.consumer_snap);
|
||||||
|
if ((int64_t)(consumer_snap - current_snap) < 0) {
|
||||||
|
ck_pr_store_64(&ring->prod.consumer_snap,
|
||||||
|
consumer_snap);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
snap.consumer_snap = consumer_snap;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_menqueue_n(struct ck_pring *ring, uintptr_t *values, size_t n)
|
||||||
|
{
|
||||||
|
|
||||||
|
/* XXX: do this better. */
|
||||||
|
for (size_t i = 0; i < n; i++) {
|
||||||
|
uintptr_t current = values[i];
|
||||||
|
uintptr_t update;
|
||||||
|
|
||||||
|
if (!ck_pring_menqueue_val(ring, current, &update)) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
values[i] = update;
|
||||||
|
}
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
@ -0,0 +1,134 @@
|
|||||||
|
#include <assert.h>
|
||||||
|
#include <ck_pring/snoop.h>
|
||||||
|
|
||||||
|
#include <sys/types.h>
|
||||||
|
|
||||||
|
static bool
|
||||||
|
snoop_update_cursor(struct ck_pring_snooper *snoop,
|
||||||
|
const struct ck_pring *ring, bool init)
|
||||||
|
{
|
||||||
|
const struct ck_pring_elt *buf = snoop->cons.buf;
|
||||||
|
uint64_t mask = snoop->cons.mask;
|
||||||
|
uint64_t cursor = snoop->cons.cursor;
|
||||||
|
uint64_t new_cursor;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
if (snoop->cons.dependency_begin < snoop->cons.dependency_end) {
|
||||||
|
(void)ck_pring_consumer_update_limit(&snoop->cons, ring);
|
||||||
|
new_cursor = snoop->cons.read_limit - 1;
|
||||||
|
} else {
|
||||||
|
new_cursor = (uint64_t)ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!init && (int64_t)(cursor - new_cursor) >= 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
snoop->cons.cursor = new_cursor;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
ck_pring_snoop_init(struct ck_pring_snooper *snoop, const struct ck_pring *ring,
|
||||||
|
uint32_t dep_begin, uint32_t dep_end)
|
||||||
|
{
|
||||||
|
|
||||||
|
snoop->cons.buf = ring->prod.buf;
|
||||||
|
snoop->cons.mask = ring->prod.mask;
|
||||||
|
snoop->cons.dependency_begin = dep_begin;
|
||||||
|
snoop->cons.dependency_end = dep_end;
|
||||||
|
(void)snoop_update_cursor(snoop, ring, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uintptr_t
|
||||||
|
ck_pring_snoop_slow(struct ck_pring_snooper *snoop,
|
||||||
|
const struct ck_pring *ring)
|
||||||
|
{
|
||||||
|
uintptr_t ret[1] = { 0 };
|
||||||
|
uintptr_t n;
|
||||||
|
|
||||||
|
n = ck_pring_snoop_n(snoop, ring, ret, 1);
|
||||||
|
/* return 0 if n == 0, otherwise ret (and n == 1). */
|
||||||
|
return ret[0] & -n;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ssize_t
|
||||||
|
ck_pring_snoop_n_inner(struct ck_pring_snooper *snoop, uintptr_t *dst, size_t n)
|
||||||
|
{
|
||||||
|
struct ck_pring_consumer *cons = &snoop->cons;
|
||||||
|
const struct ck_pring_elt *buf = cons->buf;
|
||||||
|
uint64_t mask = cons->mask;
|
||||||
|
uint64_t base_cursor = cons->cursor; /* only writer is us. */
|
||||||
|
uint64_t base_gen;
|
||||||
|
size_t base_loc = base_cursor & mask;
|
||||||
|
size_t consumed;
|
||||||
|
|
||||||
|
base_gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen);
|
||||||
|
if ((int64_t)(base_gen - base_cursor) < 0) {
|
||||||
|
/* the queue is empty. */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (consumed = 0; consumed < n; consumed++) {
|
||||||
|
uint64_t cursor = base_cursor + consumed;
|
||||||
|
uint64_t gen;
|
||||||
|
size_t loc = cursor & mask;
|
||||||
|
|
||||||
|
gen = (uint64_t)ck_pr_load_ptr(&buf[loc].gen);
|
||||||
|
ck_pr_fence_load();
|
||||||
|
dst[consumed] = ck_pr_load_ptr(&buf[loc].value);
|
||||||
|
if (gen != cursor) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ck_pr_fence_load();
|
||||||
|
/* everything matched up to here. make sure we didn't lose the race. */
|
||||||
|
base_gen = (uint64_t)ck_pr_load_ptr(&buf[base_loc].gen);
|
||||||
|
return (base_gen == base_cursor) ? (ssize_t)consumed : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t
|
||||||
|
ck_pring_snoop_n(struct ck_pring_snooper *snoop, const struct ck_pring *ring,
|
||||||
|
uintptr_t *dst, size_t n)
|
||||||
|
{
|
||||||
|
ssize_t ret;
|
||||||
|
|
||||||
|
if (n == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n > snoop->cons.mask) {
|
||||||
|
n = snoop->cons.mask + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
struct ck_pring_consumer *cons = &snoop->cons;
|
||||||
|
uint64_t cursor = cons->cursor;
|
||||||
|
uint64_t read_limit = cons->read_limit;
|
||||||
|
size_t remaining = read_limit - cursor;
|
||||||
|
|
||||||
|
if (CK_CC_UNLIKELY((int64_t)(cursor - read_limit) >= 0)) {
|
||||||
|
remaining = ck_pring_consumer_update_limit(cons, ring);
|
||||||
|
if (remaining == 0) {
|
||||||
|
return remaining;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n = (n > remaining) ? remaining : n;
|
||||||
|
ret = ck_pring_snoop_n_inner(snoop, dst, n);
|
||||||
|
if (ret >= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
n = (n + 1) / 2;
|
||||||
|
if (!snoop_update_cursor(snoop, ring, false)) {
|
||||||
|
ret = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
snoop->cons.cursor = snoop->cons.cursor + (size_t)ret;
|
||||||
|
return (size_t)ret;
|
||||||
|
}
|
Loading…
Reference in new issue