diff --git a/include/ck_ring.h b/include/ck_ring.h index 009d186..e531f0b 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -41,7 +41,8 @@ struct ck_ring { unsigned int c_head; char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; unsigned int p_tail; - char _pad[CK_MD_CACHELINE - sizeof(unsigned int)]; + unsigned int p_head; + char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2]; unsigned int size; unsigned int mask; }; @@ -68,12 +69,32 @@ ck_ring_capacity(const struct ck_ring *ring) return ring->size; } +#define CK_RING_ENQUEUE_SP_DEFINE(ring, buffer, entry, ts, size, P) do {\ + unsigned int consumer, producer, delta; \ + unsigned int mask = ring->mask; \ + \ + consumer = ck_pr_load_uint(&ring->c_head); \ + producer = ring->p_tail; \ + delta = producer + 1; \ + if (P) \ + *(unsigned int *)size = (producer - consumer) & mask; \ + \ + if ((delta & mask) == (consumer & mask)) \ + return false; \ + \ + buffer = (char *)buffer + ts * (producer & mask); \ + memcpy(buffer, entry, ts); \ + \ + /* \ + * Make sure to update slot value before indicating \ + * that the slot is available for consumption. \ + */ \ + ck_pr_fence_store(); \ + ck_pr_store_uint(&ring->p_tail, delta); \ + return true; \ +} while (0) + /* - * Atomically enqueues the specified entry. Returns true on success, returns - * false if the ck_ring is full. This operation only support one active - * invocation at a time and works in the presence of a concurrent invocation - * of ck_ring_dequeue_spsc. - * * This variant of ck_ring_enqueue_spsc returns the snapshot of queue length * with respect to the linearization point. This can be used to extract ring * size without incurring additional cacheline invalidation overhead from the @@ -83,41 +104,12 @@ CK_CC_INLINE static bool _ck_ring_enqueue_spsc_size(struct ck_ring *ring, void *restrict buffer, const void *restrict entry, - unsigned int type_size, - unsigned int *size) -{ - unsigned int consumer, producer, delta; - unsigned int mask = ring->mask; - - consumer = ck_pr_load_uint(&ring->c_head); - producer = ring->p_tail; - delta = producer + 1; - *size = (producer - consumer) & mask; - - if ((delta & mask) == (consumer & mask)) - return false; - - buffer = (char *)buffer + type_size * (producer & mask); - memcpy(buffer, entry, type_size); - - /* - * Make sure to update slot value before indicating - * that the slot is available for consumption. - */ - ck_pr_fence_store(); - ck_pr_store_uint(&ring->p_tail, delta); - return true; -} - -CK_CC_INLINE static bool -ck_ring_enqueue_spsc_size(struct ck_ring *ring, - struct ck_ring_buffer *buffer, - void *entry, + unsigned int ts, unsigned int *size) { - return _ck_ring_enqueue_spsc_size(ring, buffer, &entry, - sizeof(void *), size); + CK_RING_ENQUEUE_SP_DEFINE(ring, buffer, entry, ts, + size, true); } /* @@ -130,28 +122,24 @@ CK_CC_INLINE static bool _ck_ring_enqueue_spsc(struct ck_ring *ring, void *restrict destination, const void *restrict source, - unsigned int size) + unsigned int ts) { - unsigned int consumer, producer, delta; - unsigned int mask = ring->mask; - consumer = ck_pr_load_uint(&ring->c_head); - producer = ring->p_tail; - delta = producer + 1; + CK_RING_ENQUEUE_SP_DEFINE(ring, destination, source, + ts, NULL, false); +} - if ((delta & mask) == (consumer & mask)) - return false; +#undef CK_RING_ENQUEUE_SP_DEFINE - destination = (char *)destination + size * (producer & mask); - memcpy(destination, source, size); +CK_CC_INLINE static bool +ck_ring_enqueue_spsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry, + unsigned int *size) +{ - /* - * Make sure to update slot value before indicating - * that the slot is available for consumption. - */ - ck_pr_fence_store(); - ck_pr_store_uint(&ring->p_tail, delta); - return true; + return _ck_ring_enqueue_spsc_size(ring, buffer, &entry, + sizeof(entry), size); } CK_CC_INLINE static bool @@ -169,7 +157,7 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, */ CK_CC_INLINE static bool _ck_ring_dequeue_spsc(struct ck_ring *ring, - void *restrict buffer, + const void *restrict buffer, void *restrict target, unsigned int size) { @@ -188,7 +176,7 @@ _ck_ring_dequeue_spsc(struct ck_ring *ring, */ ck_pr_fence_load(); - buffer = (char *)buffer + size * (consumer & mask); + buffer = (const char *)buffer + size * (consumer & mask); memcpy(target, buffer, size); /* @@ -202,12 +190,12 @@ _ck_ring_dequeue_spsc(struct ck_ring *ring, CK_CC_INLINE static bool ck_ring_dequeue_spsc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, + const struct ck_ring_buffer *buffer, void *data) { return _ck_ring_dequeue_spsc(ring, buffer, - data, sizeof(void *)); + data, sizeof(data)); } /* @@ -224,7 +212,7 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, CK_CC_INLINE static bool ck_ring_enqueue_spmc_size(struct ck_ring *ring, struct ck_ring_buffer *buffer, - void *entry, + const void *entry, unsigned int *size) { @@ -241,7 +229,7 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring, CK_CC_INLINE static bool ck_ring_enqueue_spmc(struct ck_ring *ring, struct ck_ring_buffer *buffer, - void *entry) + const void *entry) { return ck_ring_enqueue_spsc(ring, buffer, entry); @@ -249,7 +237,7 @@ ck_ring_enqueue_spmc(struct ck_ring *ring, CK_CC_INLINE static bool _ck_ring_trydequeue_spmc(struct ck_ring *ring, - void *restrict buffer, + const void *buffer, void *data, unsigned int size) { @@ -265,7 +253,7 @@ _ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_pr_fence_load(); - buffer = (char *)buffer + size * (consumer & mask); + buffer = (const char *)buffer + size * (consumer & mask); memcpy(data, buffer, size); ck_pr_fence_store_atomic(); @@ -274,27 +262,28 @@ _ck_ring_trydequeue_spmc(struct ck_ring *ring, CK_CC_INLINE static bool ck_ring_trydequeue_spmc(struct ck_ring *ring, - struct ck_ring_buffer *buffer, + const struct ck_ring_buffer *buffer, void *data) { return _ck_ring_trydequeue_spmc(ring, - buffer, data, sizeof(void *)); + buffer, data, sizeof(data)); } CK_CC_INLINE static bool _ck_ring_dequeue_spmc(struct ck_ring *ring, - void *buffer, + const void *buffer, void *data, - unsigned int size) + unsigned int ts) { + const unsigned int mask = ring->mask; unsigned int consumer, producer; - unsigned int mask = ring->mask; - char *target; consumer = ck_pr_load_uint(&ring->c_head); do { + const char *target; + /* * Producer counter must represent state relative to * our latest consumer snapshot. @@ -307,8 +296,8 @@ _ck_ring_dequeue_spmc(struct ck_ring *ring, ck_pr_fence_load(); - target = (char *)buffer + size * (consumer & mask); - memcpy(data, target, size); + target = (const char *)buffer + ts * (consumer & mask); + memcpy(data, target, ts); /* Serialize load with respect to head update. */ ck_pr_fence_store_atomic(); @@ -322,12 +311,154 @@ _ck_ring_dequeue_spmc(struct ck_ring *ring, CK_CC_INLINE static bool ck_ring_dequeue_spmc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_dequeue_spmc(ring, buffer, data, + sizeof(*buffer)); +} + +#define CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, size, P) do {\ + const unsigned int mask = ring->mask; \ + unsigned int producer, consumer, delta; \ + bool r = true; \ + \ + producer = ck_pr_load_uint(&ring->p_head); \ + \ + do { \ + /* \ + * The snapshot of producer must be up to date with \ + * respect to consumer. \ + */ \ + ck_pr_fence_load(); \ + consumer = ck_pr_load_uint(&ring->c_head); \ + \ + delta = producer + 1; \ + if ((delta & mask) == (consumer & mask)) { \ + r = false; \ + goto leave; \ + } \ + } while (ck_pr_cas_uint_value(&ring->p_head, \ + producer, \ + delta, \ + &producer) == false); \ + \ + buffer = (char *)buffer + ts * (producer & mask); \ + memcpy(buffer, entry, ts); \ + \ + /* \ + * Wait until all concurrent producers have completed writing \ + * their data into the ring buffer. \ + */ \ + while (ck_pr_load_uint(&ring->p_tail) != producer) \ + ck_pr_stall(); \ + \ + /* \ + * Ensure that copy is completed before updating shared producer\ + * counter. \ + */ \ + ck_pr_fence_store(); \ + ck_pr_store_uint(&ring->p_tail, delta); \ + \ +leave: \ + if (P) \ + *(unsigned int *)size = (producer - consumer) & mask; \ + \ + return r; \ +} while (0) + +CK_CC_INLINE static bool +_ck_ring_enqueue_mpsc_size(struct ck_ring *ring, + void *buffer, + const void *entry, + unsigned int ts, + unsigned int *size) +{ + + CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, size, true); +} + +CK_CC_INLINE static bool +_ck_ring_enqueue_mpsc(struct ck_ring *ring, + void *buffer, + const void *entry, + unsigned int ts) +{ + + CK_RING_ENQUEUE_MP_DEFINE(ring, buffer, entry, ts, NULL, false); +} + +#undef CK_RING_ENQUEUE_MP_DEFINE + +CK_CC_INLINE static bool +ck_ring_enqueue_mpsc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) +{ + + return _ck_ring_enqueue_mpsc(ring, buffer, &entry, sizeof(entry)); +} + +CK_CC_INLINE static bool +ck_ring_enqueue_mpsc_size(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry, + unsigned int *size) +{ + + return _ck_ring_enqueue_mpsc_size(ring, buffer, &entry, + sizeof(entry), size); +} + +CK_CC_INLINE static bool +ck_ring_dequeue_mpsc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_dequeue_spsc(ring, buffer, data, + sizeof(data)); +} + +CK_CC_INLINE static bool +ck_ring_enqueue_mpmc(struct ck_ring *ring, + struct ck_ring_buffer *buffer, + const void *entry) +{ + + return _ck_ring_enqueue_mpsc(ring, buffer, &entry, sizeof(entry)); +} + +CK_CC_INLINE static bool +ck_ring_enqueue_mpmc_size(struct ck_ring *ring, struct ck_ring_buffer *buffer, + const void *entry, + unsigned int *size) +{ + + return _ck_ring_enqueue_mpsc_size(ring, buffer, &entry, + sizeof(entry), size); +} + +CK_CC_INLINE static bool +ck_ring_trydequeue_mpmc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, + void *data) +{ + + return _ck_ring_trydequeue_spmc(ring, + buffer, data, sizeof(data)); +} + +CK_CC_INLINE static bool +ck_ring_dequeue_mpmc(struct ck_ring *ring, + const struct ck_ring_buffer *buffer, void *data) { return _ck_ring_dequeue_spmc(ring, buffer, data, - sizeof(void *)); + sizeof(data)); } CK_CC_INLINE static void @@ -337,6 +468,7 @@ ck_ring_init(struct ck_ring *ring, unsigned int size) ring->size = size; ring->mask = size - 1; ring->p_tail = 0; + ring->p_head = 0; ring->c_head = 0; return; } @@ -384,7 +516,6 @@ ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \ sizeof(struct type), d); \ } \ \ - \ CK_CC_INLINE static bool \ ck_ring_enqueue_spmc_##name(struct ck_ring *a, \ struct type *b, \ @@ -413,14 +544,93 @@ ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ \ return _ck_ring_dequeue_spmc(a, b, c, \ sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_enqueue_mpsc(a, b, c, \ + sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_mpsc_size(a, b, c, \ + sizeof(struct type), d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_dequeue_spsc(a, b, c, \ + sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c, \ + unsigned int *d) \ +{ \ + \ + return _ck_ring_enqueue_mpsc_size(a, b, c, \ + sizeof(struct type), d); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_enqueue_mpsc(a, b, c, \ + sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_trydequeue_spmc(a, \ + b, c, sizeof(struct type)); \ +} \ + \ +CK_CC_INLINE static bool \ +ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \ + struct type *b, \ + struct type *c) \ +{ \ + \ + return _ck_ring_dequeue_spmc(a, b, c, \ + sizeof(struct type)); \ } +/* + * A single producer with one concurrent consumer. + */ #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \ ck_ring_enqueue_spsc_##name(a, b, c) #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \ ck_ring_enqueue_spsc_size_##name(a, b, c, d) #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \ ck_ring_dequeue_spsc_##name(a, b, c) + +/* + * A single producer with any number of concurrent consumers. + */ #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \ ck_ring_enqueue_spmc_##name(a, b, c) #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \ @@ -430,4 +640,27 @@ ck_ring_dequeue_spmc_##name(struct ck_ring *a, \ #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \ ck_ring_dequeue_spmc_##name(a, b, c) +/* + * Any number of concurrent producers with up to one + * concurrent consumer. + */ +#define CK_RING_ENQUEUE_MPSC(name, a, b, c) \ + ck_ring_enqueue_mpsc_##name(a, b, c) +#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \ + ck_ring_enqueue_mpsc_size_##name(a, b, c, d) +#define CK_RING_DEQUEUE_MPSC(name, a, b, c) \ + ck_ring_dequeue_mpsc_##name(a, b, c) + +/* + * Any number of concurrent producers and consumers. + */ +#define CK_RING_ENQUEUE_MPMC(name, a, b, c) \ + ck_ring_enqueue_mpmc_##name(a, b, c) +#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \ + ck_ring_enqueue_mpmc_size_##name(a, b, c, d) +#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \ + ck_ring_trydequeue_mpmc_##name(a, b, c) +#define CK_RING_DEQUEUE_MPMC(name, a, b, c) \ + ck_ring_dequeue_mpmc_##name(a, b, c) + #endif /* CK_RING_H */ diff --git a/regressions/ck_ring/benchmark/latency.c b/regressions/ck_ring/benchmark/latency.c index 5e46b1c..657be4d 100644 --- a/regressions/ck_ring/benchmark/latency.c +++ b/regressions/ck_ring/benchmark/latency.c @@ -89,5 +89,54 @@ main(int argc, char *argv[]) } printf("spmc %10d %16" PRIu64 " %16" PRIu64 "\n", size, e_a / ITERATIONS, d_a / ITERATIONS); + + ck_ring_init(&ring, size); + e_a = d_a = s = e = 0; + for (r = 0; r < ITERATIONS; r++) { + for (i = 0; i < size / 4; i += 4) { + s = rdtsc(); + ck_ring_enqueue_mpsc(&ring, buf, &entry); + ck_ring_enqueue_mpsc(&ring, buf, &entry); + ck_ring_enqueue_mpsc(&ring, buf, &entry); + ck_ring_enqueue_mpsc(&ring, buf, &entry); + e = rdtsc(); + } + e_a += (e - s) / 4; + + for (i = 0; i < size / 4; i += 4) { + s = rdtsc(); + ck_ring_dequeue_mpsc(&ring, buf, &entry); + ck_ring_dequeue_mpsc(&ring, buf, &entry); + ck_ring_dequeue_mpsc(&ring, buf, &entry); + ck_ring_dequeue_mpsc(&ring, buf, &entry); + e = rdtsc(); + } + d_a += (e - s) / 4; + } + printf("mpsc %10d %16" PRIu64 " %16" PRIu64 "\n", size, e_a / ITERATIONS, d_a / ITERATIONS); + ck_ring_init(&ring, size); + e_a = d_a = s = e = 0; + for (r = 0; r < ITERATIONS; r++) { + for (i = 0; i < size / 4; i += 4) { + s = rdtsc(); + ck_ring_enqueue_mpmc(&ring, buf, &entry); + ck_ring_enqueue_mpmc(&ring, buf, &entry); + ck_ring_enqueue_mpmc(&ring, buf, &entry); + ck_ring_enqueue_mpmc(&ring, buf, &entry); + e = rdtsc(); + } + e_a += (e - s) / 4; + + for (i = 0; i < size / 4; i += 4) { + s = rdtsc(); + ck_ring_dequeue_mpmc(&ring, buf, &entry); + ck_ring_dequeue_mpmc(&ring, buf, &entry); + ck_ring_dequeue_mpmc(&ring, buf, &entry); + ck_ring_dequeue_mpmc(&ring, buf, &entry); + e = rdtsc(); + } + d_a += (e - s) / 4; + } + printf("mpmc %10d %16" PRIu64 " %16" PRIu64 "\n", size, e_a / ITERATIONS, d_a / ITERATIONS); return (0); } diff --git a/regressions/ck_ring/validate/Makefile b/regressions/ck_ring/validate/Makefile index d8beb77..0b68fad 100644 --- a/regressions/ck_ring/validate/Makefile +++ b/regressions/ck_ring/validate/Makefile @@ -1,6 +1,7 @@ .PHONY: check clean distribution -OBJECTS=ck_ring_spsc ck_ring_spmc ck_ring_spmc_template +OBJECTS=ck_ring_spsc ck_ring_spmc ck_ring_spmc_template ck_ring_mpmc \ + ck_ring_mpmc_template SIZE=16384 all: $(OBJECTS) @@ -8,6 +9,9 @@ all: $(OBJECTS) check: all ./ck_ring_spsc $(CORES) 1 $(SIZE) ./ck_ring_spmc $(CORES) 1 $(SIZE) + ./ck_ring_spmc_template $(CORES) 1 $(SIZE) + ./ck_ring_mpmc $(CORES) 1 $(SIZE) + ./ck_ring_mpmc_template $(CORES) 1 $(SIZE) ck_ring_spsc: ck_ring_spsc.c ../../../include/ck_ring.h $(CC) $(CFLAGS) -o ck_ring_spsc ck_ring_spsc.c \ @@ -17,6 +21,14 @@ ck_ring_spmc: ck_ring_spmc.c ../../../include/ck_ring.h $(CC) $(CFLAGS) -o ck_ring_spmc ck_ring_spmc.c \ ../../../src/ck_barrier_centralized.c +ck_ring_mpmc: ck_ring_mpmc.c ../../../include/ck_ring.h + $(CC) $(CFLAGS) -o ck_ring_mpmc ck_ring_mpmc.c \ + ../../../src/ck_barrier_centralized.c + +ck_ring_mpmc_template: ck_ring_mpmc_template.c ../../../include/ck_ring.h + $(CC) $(CFLAGS) -o ck_ring_mpmc_template ck_ring_mpmc_template.c \ + ../../../src/ck_barrier_centralized.c + ck_ring_spmc_template: ck_ring_spmc_template.c ../../../include/ck_ring.h $(CC) $(CFLAGS) -o ck_ring_spmc_template ck_ring_spmc_template.c \ ../../../src/ck_barrier_centralized.c diff --git a/regressions/ck_ring/validate/ck_ring_mpmc.c b/regressions/ck_ring/validate/ck_ring_mpmc.c new file mode 100644 index 0000000..66d7f39 --- /dev/null +++ b/regressions/ck_ring/validate/ck_ring_mpmc.c @@ -0,0 +1,448 @@ +/* + * Copyright 2011-2015 Samy Al Bahra. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include "../../common.h" + +#ifndef ITERATIONS +#define ITERATIONS 128 +#endif + +struct context { + unsigned int tid; + unsigned int previous; + unsigned int next; + ck_ring_buffer_t *buffer; +}; + +struct entry { + unsigned long value_long; + unsigned int magic; + unsigned int ref; + int tid; + int value; +}; + +static int nthr; +static ck_ring_t *ring; +static ck_ring_t ring_mpmc CK_CC_CACHELINE; +static ck_ring_t ring_mw CK_CC_CACHELINE; +static struct affinity a; +static int size; +static int eb; +static ck_barrier_centralized_t barrier = CK_BARRIER_CENTRALIZED_INITIALIZER; +static struct context *_context; + +static unsigned int global_counter; + +static void * +test_mpmc(void *c) +{ + unsigned int observed = 0; + unsigned int enqueue = 0; + unsigned int seed; + int i, k, j, tid; + struct context *context = c; + ck_ring_buffer_t *buffer; + unsigned int *csp; + + csp = malloc(sizeof(*csp) * nthr); + assert(csp != NULL); + + memset(csp, 0, sizeof(*csp) * nthr); + + buffer = context->buffer; + if (aff_iterate(&a)) { + perror("ERROR: Could not affine thread"); + exit(EXIT_FAILURE); + } + + tid = ck_pr_faa_int(&eb, 1); + ck_pr_fence_memory(); + while (ck_pr_load_int(&eb) != nthr - 1); + + for (i = 0; i < ITERATIONS; i++) { + for (j = 0; j < size; j++) { + struct entry *o = NULL; + int spin; + + /* Keep trying until we encounter at least one node. */ + if (j & 1) { + if (ck_ring_dequeue_mpmc(&ring_mw, buffer, &o) == false) + o = NULL; + } else { + if (ck_ring_trydequeue_mpmc(&ring_mw, buffer, &o) == false) + o = NULL; + } + + if (o == NULL) { + o = malloc(sizeof(*o)); + if (o == NULL) + continue; + + o->value_long = (unsigned long)ck_pr_faa_uint(&global_counter, 1) + 1; + + o->magic = 0xdead; + o->ref = 0; + o->tid = tid; + + if (ck_ring_enqueue_mpmc(&ring_mw, buffer, o) == false) { + free(o); + } else { + enqueue++; + } + + continue; + } + + observed++; + + if (o->magic != 0xdead) { + ck_error("[%p] (%x)\n", + (void *)o, o->magic); + } + + o->magic = 0xbeef; + + if (csp[o->tid] >= o->value_long) + ck_error("queue semantics violated: %lu <= %lu\n", o->value_long, csp[o->tid]); + + csp[o->tid] = o->value_long; + + if (ck_pr_faa_uint(&o->ref, 1) != 0) { + ck_error("[%p] We dequeued twice.\n", (void *)o); + } + + if ((i % 4) == 0) { + spin = common_rand_r(&seed) % 16384; + for (k = 0; k < spin; k++) { + ck_pr_stall(); + } + } + + free(o); + } + } + + fprintf(stderr, "[%d] dequeue=%u enqueue=%u\n", tid, observed, enqueue); + return NULL; +} + +static void * +test_spmc(void *c) +{ + unsigned int observed = 0; + unsigned long previous = 0; + unsigned int seed; + int i, k, j, tid; + struct context *context = c; + ck_ring_buffer_t *buffer; + + buffer = context->buffer; + if (aff_iterate(&a)) { + perror("ERROR: Could not affine thread"); + exit(EXIT_FAILURE); + } + + tid = ck_pr_faa_int(&eb, 1); + ck_pr_fence_memory(); + while (ck_pr_load_int(&eb) != nthr - 1); + + for (i = 0; i < ITERATIONS; i++) { + for (j = 0; j < size; j++) { + struct entry *o; + int spin; + + /* Keep trying until we encounter at least one node. */ + if (j & 1) { + while (ck_ring_dequeue_mpmc(&ring_mpmc, buffer, + &o) == false); + } else { + while (ck_ring_trydequeue_mpmc(&ring_mpmc, buffer, + &o) == false); + } + + observed++; + if (o->value < 0 + || o->value != o->tid + || o->magic != 0xdead + || (previous != 0 && previous >= o->value_long)) { + ck_error("[0x%p] (%x) (%d, %d) >< (0, %d)\n", + (void *)o, o->magic, o->tid, o->value, size); + } + + o->magic = 0xbeef; + o->value = -31337; + o->tid = -31338; + previous = o->value_long; + + if (ck_pr_faa_uint(&o->ref, 1) != 0) { + ck_error("[%p] We dequeued twice.\n", (void *)o); + } + + if ((i % 4) == 0) { + spin = common_rand_r(&seed) % 16384; + for (k = 0; k < spin; k++) { + ck_pr_stall(); + } + } + + free(o); + } + } + + fprintf(stderr, "[%d] Observed %u\n", tid, observed); + return NULL; +} + +static void * +test(void *c) +{ + struct context *context = c; + struct entry *entry; + unsigned int s; + int i, j; + bool r; + ck_ring_buffer_t *buffer = context->buffer; + ck_barrier_centralized_state_t sense = + CK_BARRIER_CENTRALIZED_STATE_INITIALIZER; + + if (aff_iterate(&a)) { + perror("ERROR: Could not affine thread"); + exit(EXIT_FAILURE); + } + + if (context->tid == 0) { + struct entry *entries; + + entries = malloc(sizeof(struct entry) * size); + assert(entries != NULL); + + if (ck_ring_size(ring) != 0) { + ck_error("More entries than expected: %u > 0\n", + ck_ring_size(ring)); + } + + for (i = 0; i < size; i++) { + entries[i].value = i; + entries[i].tid = 0; + + if (true) { + r = ck_ring_enqueue_mpmc(ring, buffer, + entries + i); + } else { + r = ck_ring_enqueue_mpmc_size(ring, buffer, + entries + i, &s); + + if ((int)s != i) { + ck_error("Size is %u, expected %d.\n", + s, size); + } + } + + assert(r != false); + } + + if (ck_ring_size(ring) != (unsigned int)size) { + ck_error("Less entries than expected: %u < %d\n", + ck_ring_size(ring), size); + } + + if (ck_ring_capacity(ring) != ck_ring_size(ring) + 1) { + ck_error("Capacity less than expected: %u < %u\n", + ck_ring_size(ring), ck_ring_capacity(ring)); + } + } + + /* + * Wait for all threads. The idea here is to maximize the contention. + */ + ck_barrier_centralized(&barrier, &sense, nthr); + + for (i = 0; i < ITERATIONS; i++) { + for (j = 0; j < size; j++) { + buffer = _context[context->previous].buffer; + while (ck_ring_dequeue_mpmc(ring + context->previous, + buffer, &entry) == false); + + if (context->previous != (unsigned int)entry->tid) { + ck_error("[%u:%p] %u != %u\n", + context->tid, (void *)entry, entry->tid, context->previous); + } + + if (entry->value < 0 || entry->value >= size) { + ck_error("[%u:%p] %u %u\n", + context->tid, (void *)entry, entry->tid, context->previous); + } + + entry->tid = context->tid; + buffer = context->buffer; + + if (true) { + r = ck_ring_enqueue_mpmc(ring + context->tid, + buffer, entry); + } else { + r = ck_ring_enqueue_mpmc_size(ring + context->tid, + buffer, entry, &s); + + if ((int)s >= size) { + ck_error("Size %u out of range of %d\n", + s, size); + } + } + assert(r == true); + } + } + + return NULL; +} + +int +main(int argc, char *argv[]) +{ + int i, r; + unsigned long l; + pthread_t *thread; + ck_ring_buffer_t *buffer; + + if (argc != 4) { + ck_error("Usage: validate \n"); + } + + a.request = 0; + a.delta = atoi(argv[2]); + + nthr = atoi(argv[1]); + assert(nthr >= 1); + + size = atoi(argv[3]); + assert(size >= 4 && (size & size - 1) == 0); + size -= 1; + + ring = malloc(sizeof(ck_ring_t) * nthr); + assert(ring); + + _context = malloc(sizeof(*_context) * nthr); + assert(_context); + + thread = malloc(sizeof(pthread_t) * nthr); + assert(thread); + fprintf(stderr, "SPSC test:"); + for (i = 0; i < nthr; i++) { + _context[i].tid = i; + if (i == 0) { + _context[i].previous = nthr - 1; + _context[i].next = i + 1; + } else if (i == nthr - 1) { + _context[i].next = 0; + _context[i].previous = i - 1; + } else { + _context[i].next = i + 1; + _context[i].previous = i - 1; + } + + buffer = malloc(sizeof(ck_ring_buffer_t) * (size + 1)); + assert(buffer); + memset(buffer, 0, sizeof(ck_ring_buffer_t) * (size + 1)); + _context[i].buffer = buffer; + ck_ring_init(ring + i, size + 1); + r = pthread_create(thread + i, NULL, test, _context + i); + assert(r == 0); + } + + for (i = 0; i < nthr; i++) + pthread_join(thread[i], NULL); + + fprintf(stderr, " done\n"); + + fprintf(stderr, "SPMC test:\n"); + buffer = malloc(sizeof(ck_ring_buffer_t) * (size + 1)); + assert(buffer); + memset(buffer, 0, sizeof(void *) * (size + 1)); + ck_ring_init(&ring_mpmc, size + 1); + for (i = 0; i < nthr - 1; i++) { + _context[i].buffer = buffer; + r = pthread_create(thread + i, NULL, test_spmc, _context + i); + assert(r == 0); + } + + for (l = 0; l < (unsigned long)size * ITERATIONS * (nthr - 1) ; l++) { + struct entry *entry = malloc(sizeof *entry); + + assert(entry != NULL); + entry->value_long = l; + entry->value = (int)l; + entry->tid = (int)l; + entry->magic = 0xdead; + entry->ref = 0; + + /* Wait until queue is not full. */ + if (l & 1) { + while (ck_ring_enqueue_mpmc(&ring_mpmc, + buffer, + entry) == false) + ck_pr_stall(); + } else { + unsigned int s; + + while (ck_ring_enqueue_mpmc_size(&ring_mpmc, + buffer, entry, &s) == false) { + ck_pr_stall(); + } + + if ((int)s >= (size * ITERATIONS * (nthr - 1))) { + ck_error("MPMC: Unexpected size of %u\n", s); + } + } + } + + for (i = 0; i < nthr - 1; i++) + pthread_join(thread[i], NULL); + ck_pr_store_int(&eb, 0); + fprintf(stderr, "MPMC test:\n"); + buffer = malloc(sizeof(ck_ring_buffer_t) * (size + 1)); + assert(buffer); + memset(buffer, 0, sizeof(void *) * (size + 1)); + ck_ring_init(&ring_mw, size + 1); + for (i = 0; i < nthr - 1; i++) { + _context[i].buffer = buffer; + r = pthread_create(thread + i, NULL, test_mpmc, _context + i); + assert(r == 0); + } + + for (i = 0; i < nthr - 1; i++) + pthread_join(thread[i], NULL); + + return (0); +} diff --git a/regressions/ck_ring/validate/ck_ring_mpmc_template.c b/regressions/ck_ring/validate/ck_ring_mpmc_template.c new file mode 100644 index 0000000..f076e9a --- /dev/null +++ b/regressions/ck_ring/validate/ck_ring_mpmc_template.c @@ -0,0 +1,349 @@ +/* + * Copyright 2011-2015 Samy Al Bahra. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include "../../common.h" + +#ifndef ITERATIONS +#define ITERATIONS 128 +#endif + +struct context { + unsigned int tid; + unsigned int previous; + unsigned int next; + struct entry **buffer; +}; + +struct entry { + unsigned long value_long; + unsigned int magic; + unsigned int ref; + int tid; + int value; +}; + +CK_RING_PROTOTYPE(entry, entry *) + +static int nthr; +static ck_ring_t *ring; +static ck_ring_t ring_spmc CK_CC_CACHELINE; +static struct affinity a; +static int size; +static int eb; +static ck_barrier_centralized_t barrier = CK_BARRIER_CENTRALIZED_INITIALIZER; +static struct context *_context; + +static void * +test_spmc(void *c) +{ + unsigned int observed = 0; + unsigned long previous = 0; + unsigned int seed; + int i, k, j, tid; + struct context *context = c; + struct entry **buffer; + + buffer = context->buffer; + if (aff_iterate(&a)) { + perror("ERROR: Could not affine thread"); + exit(EXIT_FAILURE); + } + + tid = ck_pr_faa_int(&eb, 1); + ck_pr_fence_memory(); + while (ck_pr_load_int(&eb) != nthr - 1); + + for (i = 0; i < ITERATIONS; i++) { + for (j = 0; j < size; j++) { + struct entry *o; + int spin; + + /* Keep trying until we encounter at least one node. */ + if (j & 1) { + while (CK_RING_DEQUEUE_MPMC(entry, + &ring_spmc, buffer, &o) == false); + } else { + while (CK_RING_TRYDEQUEUE_MPMC(entry, + &ring_spmc, buffer, &o) == false); + } + + observed++; + if (o->value < 0 + || o->value != o->tid + || o->magic != 0xdead + || (previous != 0 && previous >= o->value_long)) { + ck_error("[0x%p] (%x) (%d, %d) >< (0, %d)\n", + (void *)o, o->magic, o->tid, o->value, size); + } + + o->magic = 0xbeef; + o->value = -31337; + o->tid = -31338; + previous = o->value_long; + + if (ck_pr_faa_uint(&o->ref, 1) != 0) { + ck_error("[%p] We dequeued twice.\n", (void *)o); + } + + if ((i % 4) == 0) { + spin = common_rand_r(&seed) % 16384; + for (k = 0; k < spin; k++) { + ck_pr_stall(); + } + } + + free(o); + } + } + + fprintf(stderr, "[%d] Observed %u\n", tid, observed); + return NULL; +} + +static void * +test(void *c) +{ + struct context *context = c; + struct entry *entry; + unsigned int s; + int i, j; + bool r; + struct entry **buffer = context->buffer; + ck_barrier_centralized_state_t sense = + CK_BARRIER_CENTRALIZED_STATE_INITIALIZER; + + if (aff_iterate(&a)) { + perror("ERROR: Could not affine thread"); + exit(EXIT_FAILURE); + } + + if (context->tid == 0) { + struct entry **entries; + + entries = malloc(sizeof(struct entry *) * size); + assert(entries != NULL); + + if (ck_ring_size(ring) != 0) { + ck_error("More entries than expected: %u > 0\n", + ck_ring_size(ring)); + } + + for (i = 0; i < size; i++) { + entries[i] = malloc(sizeof(struct entry)); + assert(entries[i] != NULL); + + entries[i]->value = i; + entries[i]->tid = 0; + + if (i & 1) { + r = CK_RING_ENQUEUE_MPMC(entry, ring, buffer, + &entries[i]); + } else { + r = CK_RING_ENQUEUE_MPMC_SIZE(entry, ring, + buffer, &entries[i], &s); + + if ((int)s != i) { + ck_error("Size is %u, expected %d.\n", + s, size); + } + } + + assert(r != false); + } + + if (ck_ring_size(ring) != (unsigned int)size) { + ck_error("Less entries than expected: %u < %d\n", + ck_ring_size(ring), size); + } + + if (ck_ring_capacity(ring) != ck_ring_size(ring) + 1) { + ck_error("Capacity less than expected: %u < %u\n", + ck_ring_size(ring), ck_ring_capacity(ring)); + } + } + + /* + * Wait for all threads. The idea here is to maximize the contention. + */ + ck_barrier_centralized(&barrier, &sense, nthr); + + for (i = 0; i < ITERATIONS; i++) { + for (j = 0; j < size; j++) { + buffer = _context[context->previous].buffer; + while (CK_RING_DEQUEUE_MPMC(entry, + ring + context->previous, + buffer, &entry) == false); + + if (context->previous != (unsigned int)entry->tid) { + ck_error("[%u:%p] %u != %u\n", + context->tid, (void *)entry, + entry->tid, context->previous); + } + + if (entry->value < 0 || entry->value >= size) { + ck_error("[%u:%p] %u %u\n", + context->tid, (void *)entry, + entry->tid, context->previous); + } + + entry->tid = context->tid; + buffer = context->buffer; + + if (i & 1) { + r = CK_RING_ENQUEUE_MPMC(entry, + ring + context->tid, + buffer, &entry); + } else { + r = CK_RING_ENQUEUE_MPMC_SIZE(entry, + ring + context->tid, + buffer, &entry, &s); + + if ((int)s >= size) { + ck_error("Size %u out of range of %d\n", + s, size); + } + } + assert(r == true); + } + } + + return NULL; +} + +int +main(int argc, char *argv[]) +{ + int i, r; + unsigned long l; + pthread_t *thread; + struct entry **buffer; + + if (argc != 4) { + ck_error("Usage: validate \n"); + } + + a.request = 0; + a.delta = atoi(argv[2]); + + nthr = atoi(argv[1]); + assert(nthr >= 1); + + size = atoi(argv[3]); + assert(size >= 4 && (size & size - 1) == 0); + size -= 1; + + ring = malloc(sizeof(ck_ring_t) * nthr); + assert(ring); + + _context = malloc(sizeof(*_context) * nthr); + assert(_context); + + thread = malloc(sizeof(pthread_t) * nthr); + assert(thread); + + fprintf(stderr, "SPSC test:"); + for (i = 0; i < nthr; i++) { + _context[i].tid = i; + if (i == 0) { + _context[i].previous = nthr - 1; + _context[i].next = i + 1; + } else if (i == nthr - 1) { + _context[i].next = 0; + _context[i].previous = i - 1; + } else { + _context[i].next = i + 1; + _context[i].previous = i - 1; + } + + buffer = malloc(sizeof(struct entry *) * (size + 1)); + assert(buffer); + memset(buffer, 0, sizeof(struct entry *) * (size + 1)); + _context[i].buffer = buffer; + ck_ring_init(ring + i, size + 1); + r = pthread_create(thread + i, NULL, test, _context + i); + assert(r == 0); + } + + for (i = 0; i < nthr; i++) + pthread_join(thread[i], NULL); + + fprintf(stderr, " done\n"); + + fprintf(stderr, "MPMC test:\n"); + buffer = malloc(sizeof(struct entry *) * (size + 1)); + assert(buffer); + memset(buffer, 0, sizeof(struct entry *) * (size + 1)); + ck_ring_init(&ring_spmc, size + 1); + for (i = 0; i < nthr - 1; i++) { + _context[i].buffer = buffer; + r = pthread_create(thread + i, NULL, test_spmc, _context + i); + assert(r == 0); + } + + for (l = 0; l < (unsigned long)size * ITERATIONS * (nthr - 1) ; l++) { + struct entry *entry = malloc(sizeof *entry); + + assert(entry != NULL); + entry->value_long = l; + entry->value = (int)l; + entry->tid = (int)l; + entry->magic = 0xdead; + entry->ref = 0; + + /* Wait until queue is not full. */ + if (l & 1) { + while (CK_RING_ENQUEUE_MPMC(entry, &ring_spmc, + buffer, &entry) == false) { + ck_pr_stall(); + } + } else { + unsigned int s; + + while (CK_RING_ENQUEUE_MPMC_SIZE(entry, &ring_spmc, + buffer, &entry, &s) == false) { + ck_pr_stall(); + } + + if ((int)s >= (size * ITERATIONS * (nthr - 1))) { + ck_error("MPMC: Unexpected size of %u\n", s); + } + } + } + + for (i = 0; i < nthr - 1; i++) + pthread_join(thread[i], NULL); + + return 0; +}