From 3edb523da5f6c9dcea41879ceb6643dcc7bde305 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Wed, 11 Dec 2013 21:28:44 +0100 Subject: [PATCH] ck_ring: Move the ring buffer outside of the ck_ring_t Remove the ring buffer from the struct ck_ring, it is now required to explicitely pass the ring buffer for enqueue/dequeue. That can be useful for systems with multiple address spaces. --- include/ck_ring.h | 225 +++----------------- regressions/ck_ring/benchmark/latency.c | 39 ++-- regressions/ck_ring/validate/Makefile | 15 +- regressions/ck_ring/validate/ck_ring_spmc.c | 65 +++--- regressions/ck_ring/validate/ck_ring_spsc.c | 42 ++-- 5 files changed, 115 insertions(+), 271 deletions(-) diff --git a/include/ck_ring.h b/include/ck_ring.h index 76d29ad..00831bf 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -36,187 +36,6 @@ /* * Concurrent ring buffer. */ -#define CK_RING(type, name) \ - struct ck_ring_##name { \ - unsigned int c_head; \ - char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; \ - unsigned int p_tail; \ - char _pad[CK_MD_CACHELINE - sizeof(unsigned int)]; \ - unsigned int size; \ - unsigned int mask; \ - struct type *ring; \ - }; \ - CK_CC_INLINE static void \ - ck_ring_init_##name(struct ck_ring_##name *ring, \ - struct type *buffer, \ - unsigned int size) \ - { \ - \ - ring->size = size; \ - ring->mask = size - 1; \ - ring->p_tail = 0; \ - ring->c_head = 0; \ - ring->ring = buffer; \ - return; \ - } \ - CK_CC_INLINE static unsigned int \ - ck_ring_size_##name(struct ck_ring_##name *ring) \ - { \ - unsigned int c, p; \ - \ - c = ck_pr_load_uint(&ring->c_head); \ - p = ck_pr_load_uint(&ring->p_tail); \ - return (p - c) & ring->mask; \ - } \ - CK_CC_INLINE static unsigned int \ - ck_ring_capacity_##name(struct ck_ring_##name *ring) \ - { \ - \ - return ring->size; \ - } \ - CK_CC_INLINE static bool \ - ck_ring_enqueue_spsc_size_##name(struct ck_ring_##name *ring, \ - struct type *entry, \ - unsigned int *size) \ - { \ - unsigned int consumer, producer, delta; \ - unsigned int mask = ring->mask; \ - \ - consumer = ck_pr_load_uint(&ring->c_head); \ - producer = ring->p_tail; \ - delta = producer + 1; \ - *size = (producer - consumer) & mask; \ - \ - if ((delta & mask) == (consumer & mask)) \ - return false; \ - \ - ring->ring[producer & mask] = *entry; \ - ck_pr_fence_store(); \ - ck_pr_store_uint(&ring->p_tail, delta); \ - return true; \ - } \ - CK_CC_INLINE static bool \ - ck_ring_enqueue_spsc_##name(struct ck_ring_##name *ring, \ - struct type *entry) \ - { \ - unsigned int consumer, producer, delta; \ - unsigned int mask = ring->mask; \ - \ - consumer = ck_pr_load_uint(&ring->c_head); \ - producer = ring->p_tail; \ - delta = producer + 1; \ - \ - if ((delta & mask) == (consumer & mask)) \ - return false; \ - \ - ring->ring[producer & mask] = *entry; \ - ck_pr_fence_store(); \ - ck_pr_store_uint(&ring->p_tail, delta); \ - return true; \ - } \ - CK_CC_INLINE static bool \ - ck_ring_dequeue_spsc_##name(struct ck_ring_##name *ring, \ - struct type *data) \ - { \ - unsigned int consumer, producer; \ - unsigned int mask = ring->mask; \ - \ - consumer = ring->c_head; \ - producer = ck_pr_load_uint(&ring->p_tail); \ - \ - if (consumer == producer) \ - return false; \ - \ - ck_pr_fence_load(); \ - *data = ring->ring[consumer & mask]; \ - ck_pr_fence_store(); \ - ck_pr_store_uint(&ring->c_head, consumer + 1); \ - \ - return true; \ - } \ - CK_CC_INLINE static bool \ - ck_ring_enqueue_spmc_size_##name(struct ck_ring_##name *ring, \ - void *entry, unsigned int *size) \ - { \ - \ - return ck_ring_enqueue_spsc_size_##name(ring, entry, size); \ - } \ - CK_CC_INLINE static bool \ - ck_ring_enqueue_spmc_##name(struct ck_ring_##name *ring, void *entry) \ - { \ - \ - return ck_ring_enqueue_spsc_##name(ring, entry); \ - } \ - CK_CC_INLINE static bool \ - ck_ring_trydequeue_spmc_##name(struct ck_ring_##name *ring, \ - struct type *data) \ - { \ - unsigned int consumer, producer; \ - unsigned int mask = ring->mask; \ - \ - consumer = ck_pr_load_uint(&ring->c_head); \ - ck_pr_fence_load(); \ - producer = ck_pr_load_uint(&ring->p_tail); \ - \ - if (consumer == producer) \ - return false; \ - \ - ck_pr_fence_load(); \ - *data = ring->ring[consumer & mask]; \ - ck_pr_fence_memory(); \ - return ck_pr_cas_uint(&ring->c_head, \ - consumer, \ - consumer + 1); \ - } \ - CK_CC_INLINE static bool \ - ck_ring_dequeue_spmc_##name(struct ck_ring_##name *ring, \ - struct type *data) \ - { \ - unsigned int consumer, producer; \ - unsigned int mask = ring->mask; \ - \ - consumer = ck_pr_load_uint(&ring->c_head); \ - do { \ - ck_pr_fence_load(); \ - producer = ck_pr_load_uint(&ring->p_tail); \ - \ - if (consumer == producer) \ - return false; \ - \ - ck_pr_fence_load(); \ - *data = ring->ring[consumer & mask]; \ - ck_pr_fence_memory(); \ - } while (ck_pr_cas_uint_value(&ring->c_head, \ - consumer, \ - consumer + 1, \ - &consumer) == false); \ - \ - return true; \ - } - - -#define CK_RING_INSTANCE(name) \ - struct ck_ring_##name -#define CK_RING_INIT(name, object, buffer, size) \ - ck_ring_init_##name(object, buffer, size) -#define CK_RING_SIZE(name, object) \ - ck_ring_size_##name(object) -#define CK_RING_CAPACITY(name, object) \ - ck_ring_capacity_##name(object) -#define CK_RING_ENQUEUE_SPSC_SIZE(name, object, value, s) \ - ck_ring_enqueue_spsc_size_##name(object, value, s) -#define CK_RING_ENQUEUE_SPSC(name, object, value) \ - ck_ring_enqueue_spsc_##name(object, value) -#define CK_RING_DEQUEUE_SPSC(name, object, value) \ - ck_ring_dequeue_spsc_##name(object, value) -#define CK_RING_DEQUEUE_SPMC(name, object, value) \ - ck_ring_dequeue_spmc_##name(object, value) -#define CK_RING_TRYDEQUEUE_SPMC(name, object, value) \ - ck_ring_trydequeue_spmc_##name(object, value) -#define CK_RING_ENQUEUE_SPMC_SIZE(name, object, value, s) \ - ck_ring_enqueue_spmc_size_##name(object, value, s) -#define CK_RING_ENQUEUE_SPMC(name, object, value) \ - ck_ring_enqueue_spmc_##name(object, value) struct ck_ring { unsigned int c_head; @@ -225,7 +44,6 @@ struct ck_ring { char _pad[CK_MD_CACHELINE - sizeof(unsigned int)]; unsigned int size; unsigned int mask; - void **ring; }; typedef struct ck_ring ck_ring_t; @@ -246,6 +64,12 @@ ck_ring_capacity(struct ck_ring *ring) return ring->size; } +struct ck_ring_buffer { + void *ring; +}; + +typedef struct ck_ring_buffer ck_ring_buffer_t; + /* * Atomically enqueues the specified entry. Returns true on success, returns * false if the ck_ring is full. This operation only support one active @@ -258,12 +82,13 @@ ck_ring_capacity(struct ck_ring *ring) * writer. */ CK_CC_INLINE static bool -ck_ring_enqueue_spsc_size(struct ck_ring *ring, +ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry, unsigned int *size) { unsigned int consumer, producer, delta; unsigned int mask = ring->mask; + void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); producer = ring->p_tail; @@ -273,7 +98,7 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, if ((delta & mask) == (consumer & mask)) return false; - ring->ring[producer & mask] = entry; + ring_buf[producer & mask] = entry; /* * Make sure to update slot value before indicating @@ -291,10 +116,11 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring, * of ck_ring_dequeue_spsc. */ CK_CC_INLINE static bool -ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry) +ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) { unsigned int consumer, producer, delta; unsigned int mask = ring->mask; + void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); producer = ring->p_tail; @@ -303,7 +129,7 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry) if ((delta & mask) == (consumer & mask)) return false; - ring->ring[producer & mask] = entry; + ring_buf[producer & mask] = entry; /* * Make sure to update slot value before indicating @@ -318,10 +144,11 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry) * Single consumer and single producer ring buffer dequeue (consumer). */ CK_CC_INLINE static bool -ck_ring_dequeue_spsc(struct ck_ring *ring, void *data) +ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) { unsigned int consumer, producer; unsigned int mask = ring->mask; + void **ring_buf = buf.ring; consumer = ring->c_head; producer = ck_pr_load_uint(&ring->p_tail); @@ -342,7 +169,7 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, void *data) * troublesome on platforms where sizeof(void *) * is not guaranteed to be sizeof(T *). */ - ck_pr_store_ptr(data, ring->ring[consumer & mask]); + ck_pr_store_ptr(data, ring_buf[consumer & mask]); ck_pr_fence_store(); ck_pr_store_uint(&ring->c_head, consumer + 1); return true; @@ -360,12 +187,12 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, void *data) * writer. */ CK_CC_INLINE static bool -ck_ring_enqueue_spmc_size(struct ck_ring *ring, +ck_ring_enqueue_spmc_size(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry, unsigned int *size) { - return ck_ring_enqueue_spsc_size(ring, entry, size); + return ck_ring_enqueue_spsc_size(ring, buf, entry, size); } /* @@ -375,17 +202,18 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring, * invocations of ck_ring_dequeue_spmc. */ CK_CC_INLINE static bool -ck_ring_enqueue_spmc(struct ck_ring *ring, void *entry) +ck_ring_enqueue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry) { - return ck_ring_enqueue_spsc(ring, entry); + return ck_ring_enqueue_spsc(ring, buf, entry); } CK_CC_INLINE static bool -ck_ring_trydequeue_spmc(struct ck_ring *ring, void *data) +ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) { unsigned int consumer, producer; unsigned int mask = ring->mask; + void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); ck_pr_fence_load(); @@ -395,18 +223,19 @@ ck_ring_trydequeue_spmc(struct ck_ring *ring, void *data) return false; ck_pr_fence_load(); - ck_pr_store_ptr(data, ring->ring[consumer & mask]); + ck_pr_store_ptr(data, ring_buf[consumer & mask]); ck_pr_fence_memory(); return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); } CK_CC_INLINE static bool -ck_ring_dequeue_spmc(struct ck_ring *ring, void *data) +ck_ring_dequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data) { unsigned int consumer, producer; unsigned int mask = ring->mask; void *r; + void **ring_buf = buf.ring; consumer = ck_pr_load_uint(&ring->c_head); @@ -430,7 +259,7 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, void *data) * volatile load to force volatile semantics while allowing * for r itself to remain aliased across the loop. */ - r = ck_pr_load_ptr(&ring->ring[consumer & mask]); + r = ck_pr_load_ptr(&ring_buf[consumer & mask]); /* Serialize load with respect to head update. */ ck_pr_fence_memory(); @@ -448,15 +277,13 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, void *data) } CK_CC_INLINE static void -ck_ring_init(struct ck_ring *ring, void *buffer, unsigned int size) +ck_ring_init(struct ck_ring *ring, unsigned int size) { - memset(buffer, 0, sizeof(void *) * size); ring->size = size; ring->mask = size - 1; ring->p_tail = 0; ring->c_head = 0; - ring->ring = buffer; return; } diff --git a/regressions/ck_ring/benchmark/latency.c b/regressions/ck_ring/benchmark/latency.c index 332e436..bf055ac 100644 --- a/regressions/ck_ring/benchmark/latency.c +++ b/regressions/ck_ring/benchmark/latency.c @@ -14,8 +14,6 @@ struct entry { int tid; int value; }; -CK_RING(entry, entry_ring) -static CK_RING_INSTANCE(entry_ring) ring; int main(int argc, char *argv[]) @@ -24,6 +22,8 @@ main(int argc, char *argv[]) uint64_t s, e, e_a, d_a; struct entry *buffer; struct entry entry = {0, 0}; + ck_ring_buffer_t buf; + ck_ring_t ring; if (argc != 2) { ck_error("Usage: latency \n"); @@ -39,26 +39,27 @@ main(int argc, char *argv[]) ck_error("ERROR: Failed to allocate buffer\n"); } - CK_RING_INIT(entry_ring, &ring, buffer, size); + ck_ring_init(&ring, size); + buf.ring = buffer; e_a = d_a = s = e = 0; for (r = 0; r < ITERATIONS; r++) { for (i = 0; i < size / 4; i += 4) { s = rdtsc(); - CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry); - CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry); - CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry); - CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry); + ck_ring_enqueue_spsc(&ring, buf, &entry); + ck_ring_enqueue_spsc(&ring, buf, &entry); + ck_ring_enqueue_spsc(&ring, buf, &entry); + ck_ring_enqueue_spsc(&ring, buf, &entry); e = rdtsc(); } e_a += (e - s) / 4; for (i = 0; i < size / 4; i += 4) { s = rdtsc(); - CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry); - CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry); - CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry); - CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry); + ck_ring_dequeue_spsc(&ring, buf, &entry); + ck_ring_dequeue_spsc(&ring, buf, &entry); + ck_ring_dequeue_spsc(&ring, buf, &entry); + ck_ring_dequeue_spsc(&ring, buf, &entry); e = rdtsc(); } d_a += (e - s) / 4; @@ -70,20 +71,20 @@ main(int argc, char *argv[]) for (r = 0; r < ITERATIONS; r++) { for (i = 0; i < size / 4; i += 4) { s = rdtsc(); - CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry); - CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry); - CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry); - CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry); + ck_ring_enqueue_spmc(&ring, buf, &entry); + ck_ring_enqueue_spmc(&ring, buf, &entry); + ck_ring_enqueue_spmc(&ring, buf, &entry); + ck_ring_enqueue_spmc(&ring, buf, &entry); e = rdtsc(); } e_a += (e - s) / 4; for (i = 0; i < size / 4; i += 4) { s = rdtsc(); - CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry); - CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry); - CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry); - CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry); + ck_ring_dequeue_spmc(&ring, buf, &entry); + ck_ring_dequeue_spmc(&ring, buf, &entry); + ck_ring_dequeue_spmc(&ring, buf, &entry); + ck_ring_dequeue_spmc(&ring, buf, &entry); e = rdtsc(); } d_a += (e - s) / 4; diff --git a/regressions/ck_ring/validate/Makefile b/regressions/ck_ring/validate/Makefile index 83d4273..8d72995 100644 --- a/regressions/ck_ring/validate/Makefile +++ b/regressions/ck_ring/validate/Makefile @@ -1,30 +1,21 @@ .PHONY: check clean distribution -OBJECTS=ck_ring_spsc ck_ring_spsc_template ck_ring_spmc ck_ring_spmc_template +OBJECTS=ck_ring_spsc ck_ring_spmc SIZE=16384 +CFLAGS += -g2 all: $(OBJECTS) check: all ./ck_ring_spsc $(CORES) 1 $(SIZE) - ./ck_ring_spsc_template $(CORES) 1 $(SIZE) ./ck_ring_spmc $(CORES) 1 $(SIZE) - ./ck_ring_spmc_template $(CORES) 1 $(SIZE) - -ck_ring_spsc_template: ck_ring_spsc_template.c ../../../include/ck_ring.h - $(CC) $(CFLAGS) -o ck_ring_spsc_template ck_ring_spsc_template.c \ - ../../../src/ck_barrier_centralized.c - -ck_ring_spmc_template: ck_ring_spmc_template.c ../../../include/ck_ring.h - $(CC) $(CFLAGS) -o ck_ring_spmc_template ck_ring_spmc_template.c \ - ../../../src/ck_barrier_centralized.c ck_ring_spsc: ck_ring_spsc.c ../../../include/ck_ring.h $(CC) $(CFLAGS) -o ck_ring_spsc ck_ring_spsc.c \ ../../../src/ck_barrier_centralized.c ck_ring_spmc: ck_ring_spmc.c ../../../include/ck_ring.h - $(CC) $(CFLAGS) -o ck_ring_spmc ck_ring_spmc.c \ + $(CC) $(CFLAGS) -g2 -o ck_ring_spmc ck_ring_spmc.c \ ../../../src/ck_barrier_centralized.c clean: diff --git a/regressions/ck_ring/validate/ck_ring_spmc.c b/regressions/ck_ring/validate/ck_ring_spmc.c index 75b8d18..062b7b0 100644 --- a/regressions/ck_ring/validate/ck_ring_spmc.c +++ b/regressions/ck_ring/validate/ck_ring_spmc.c @@ -43,6 +43,7 @@ struct context { unsigned int tid; unsigned int previous; unsigned int next; + void *buffer; }; struct entry { @@ -60,6 +61,7 @@ static struct affinity a; static int size; static int eb; static ck_barrier_centralized_t barrier = CK_BARRIER_CENTRALIZED_INITIALIZER; +static struct context *_context; static void * test_spmc(void *c) @@ -68,8 +70,10 @@ test_spmc(void *c) unsigned long previous = 0; unsigned int seed; int i, k, j, tid; + struct context *context = c; + ck_ring_buffer_t buf; - (void)c; + buf.ring = context->buffer; if (aff_iterate(&a)) { perror("ERROR: Could not affine thread"); exit(EXIT_FAILURE); @@ -86,9 +90,11 @@ test_spmc(void *c) /* Keep trying until we encounter at least one node. */ if (j & 1) { - while (ck_ring_dequeue_spmc(&ring_spmc, &o) == false); + while (ck_ring_dequeue_spmc(&ring_spmc, buf, + &o) == false); } else { - while (ck_ring_trydequeue_spmc(&ring_spmc, &o) == false); + while (ck_ring_trydequeue_spmc(&ring_spmc, buf, + &o) == false); } observed++; @@ -132,9 +138,12 @@ test(void *c) unsigned int s; int i, j; bool r; + ck_ring_buffer_t buf; ck_barrier_centralized_state_t sense = CK_BARRIER_CENTRALIZED_STATE_INITIALIZER; + buf.ring = context->buffer; + if (aff_iterate(&a)) { perror("ERROR: Could not affine thread"); exit(EXIT_FAILURE); @@ -156,9 +165,10 @@ test(void *c) entries[i].tid = 0; if (i & 1) { - r = ck_ring_enqueue_spmc(ring, entries + i); + r = ck_ring_enqueue_spmc(ring, buf, + entries + i); } else { - r = ck_ring_enqueue_spmc_size(ring, + r = ck_ring_enqueue_spmc_size(ring, buf, entries + i, &s); if ((int)s != i) { @@ -188,7 +198,9 @@ test(void *c) for (i = 0; i < ITERATIONS; i++) { for (j = 0; j < size; j++) { - while (ck_ring_dequeue_spmc(ring + context->previous, &entry) == false); + buf.ring = _context[context->previous].buffer; + while (ck_ring_dequeue_spmc(ring + context->previous, + buf, &entry) == false); if (context->previous != (unsigned int)entry->tid) { ck_error("[%u:%p] %u != %u\n", @@ -201,13 +213,14 @@ test(void *c) } entry->tid = context->tid; + buf.ring = context->buffer; if (i & 1) { r = ck_ring_enqueue_spmc(ring + context->tid, - entry); + buf, entry); } else { r = ck_ring_enqueue_spmc_size(ring + context->tid, - entry, &s); + buf, entry, &s); if ((int)s >= size) { ck_error("Size %u out of range of %d\n", @@ -227,8 +240,8 @@ main(int argc, char *argv[]) int i, r; void *buffer; unsigned long l; - struct context *context; pthread_t *thread; + ck_ring_buffer_t buf; if (argc != 4) { ck_error("Usage: validate \n"); @@ -247,31 +260,32 @@ main(int argc, char *argv[]) ring = malloc(sizeof(ck_ring_t) * nthr); assert(ring); - context = malloc(sizeof(*context) * nthr); - assert(context); + _context = malloc(sizeof(*_context) * nthr); + assert(_context); thread = malloc(sizeof(pthread_t) * nthr); assert(thread); fprintf(stderr, "SPSC test:"); for (i = 0; i < nthr; i++) { - context[i].tid = i; + _context[i].tid = i; if (i == 0) { - context[i].previous = nthr - 1; - context[i].next = i + 1; + _context[i].previous = nthr - 1; + _context[i].next = i + 1; } else if (i == nthr - 1) { - context[i].next = 0; - context[i].previous = i - 1; + _context[i].next = 0; + _context[i].previous = i - 1; } else { - context[i].next = i + 1; - context[i].previous = i - 1; + _context[i].next = i + 1; + _context[i].previous = i - 1; } buffer = malloc(sizeof(void *) * (size + 1)); assert(buffer); memset(buffer, 0, sizeof(void *) * (size + 1)); - ck_ring_init(ring + i, buffer, size + 1); - r = pthread_create(thread + i, NULL, test, context + i); + _context[i].buffer = buffer; + ck_ring_init(ring + i, size + 1); + r = pthread_create(thread + i, NULL, test, _context + i); assert(r == 0); } @@ -284,9 +298,11 @@ main(int argc, char *argv[]) buffer = malloc(sizeof(void *) * (size + 1)); assert(buffer); memset(buffer, 0, sizeof(void *) * (size + 1)); - ck_ring_init(&ring_spmc, buffer, size + 1); + ck_ring_init(&ring_spmc, size + 1); + buf.ring = buffer; for (i = 0; i < nthr - 1; i++) { - r = pthread_create(thread + i, NULL, test_spmc, context + i); + _context[i].buffer = buffer; + r = pthread_create(thread + i, NULL, test_spmc, _context + i); assert(r == 0); } @@ -302,13 +318,14 @@ main(int argc, char *argv[]) /* Wait until queue is not full. */ if (l & 1) { - while (ck_ring_enqueue_spmc(&ring_spmc, entry) == false) + while (ck_ring_enqueue_spmc(&ring_spmc, buf, + entry) == false) ck_pr_stall(); } else { unsigned int s; while (ck_ring_enqueue_spmc_size(&ring_spmc, - entry, &s) == false) { + buf, entry, &s) == false) { ck_pr_stall(); } diff --git a/regressions/ck_ring/validate/ck_ring_spsc.c b/regressions/ck_ring/validate/ck_ring_spsc.c index 9dbb079..587cd07 100644 --- a/regressions/ck_ring/validate/ck_ring_spsc.c +++ b/regressions/ck_ring/validate/ck_ring_spsc.c @@ -41,6 +41,7 @@ struct context { unsigned int tid; unsigned int previous; unsigned int next; + void *buffer; }; struct entry { @@ -53,6 +54,7 @@ static ck_ring_t *ring; static struct affinity a; static int size; static ck_barrier_centralized_t barrier = CK_BARRIER_CENTRALIZED_INITIALIZER; +static struct context *_context; static void * test(void *c) @@ -64,12 +66,14 @@ test(void *c) bool r; ck_barrier_centralized_state_t sense = CK_BARRIER_CENTRALIZED_STATE_INITIALIZER; + ck_ring_buffer_t buf; if (aff_iterate(&a)) { perror("ERROR: Could not affine thread"); exit(EXIT_FAILURE); } + buf.ring = context->buffer; if (context->tid == 0) { struct entry *entries; @@ -86,10 +90,11 @@ test(void *c) entries[i].tid = 0; if (i & 1) { - r = ck_ring_enqueue_spsc(ring, entries + i); + r = ck_ring_enqueue_spsc(ring, buf, + entries + i); } else { r = ck_ring_enqueue_spsc_size(ring, - entries + i, &s); + buf, entries + i, &s); if ((int)s != i) { ck_error("Size is %u, expected %d\n", @@ -115,7 +120,9 @@ test(void *c) for (i = 0; i < ITERATIONS; i++) { for (j = 0; j < size; j++) { - while (ck_ring_dequeue_spsc(ring + context->previous, &entry) == false); + buf.ring = _context[context->previous].buffer; + while (ck_ring_dequeue_spsc(ring + context->previous, + buf, &entry) == false); if (context->previous != (unsigned int)entry->tid) { ck_error("[%u:%p] %u != %u\n", @@ -128,12 +135,13 @@ test(void *c) } entry->tid = context->tid; + buf.ring = context->buffer; if (i & 1) { r = ck_ring_enqueue_spsc(ring + context->tid, - entry); + buf, entry); } else { r = ck_ring_enqueue_spsc_size(ring + - context->tid, entry, &s); + context->tid, buf, entry, &s); if ((int)s >= size) { ck_error("Size %u is out of range %d\n", @@ -152,7 +160,6 @@ main(int argc, char *argv[]) { int i, r; void *buffer; - struct context *context; pthread_t *thread; if (argc != 4) { @@ -172,29 +179,30 @@ main(int argc, char *argv[]) ring = malloc(sizeof(ck_ring_t) * nthr); assert(ring); - context = malloc(sizeof(*context) * nthr); - assert(context); + _context = malloc(sizeof(*_context) * nthr); + assert(_context); thread = malloc(sizeof(pthread_t) * nthr); assert(thread); for (i = 0; i < nthr; i++) { - context[i].tid = i; + _context[i].tid = i; if (i == 0) { - context[i].previous = nthr - 1; - context[i].next = i + 1; + _context[i].previous = nthr - 1; + _context[i].next = i + 1; } else if (i == nthr - 1) { - context[i].next = 0; - context[i].previous = i - 1; + _context[i].next = 0; + _context[i].previous = i - 1; } else { - context[i].next = i + 1; - context[i].previous = i - 1; + _context[i].next = i + 1; + _context[i].previous = i - 1; } buffer = malloc(sizeof(void *) * (size + 1)); assert(buffer); - ck_ring_init(ring + i, buffer, size + 1); - r = pthread_create(thread + i, NULL, test, context + i); + _context[i].buffer = buffer; + ck_ring_init(ring + i, size + 1); + r = pthread_create(thread + i, NULL, test, _context + i); assert(r == 0); }