ck_ring: Move the ring buffer outside of the ck_ring_t

Remove the ring buffer from the struct ck_ring, it is now required to
explicitely pass the ring buffer for enqueue/dequeue. That can be useful for
systems with multiple address spaces.
ck_pring
Olivier Houchard 11 years ago
parent 4c878ff1de
commit 3edb523da5

@ -36,187 +36,6 @@
/*
* Concurrent ring buffer.
*/
#define CK_RING(type, name) \
struct ck_ring_##name { \
unsigned int c_head; \
char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; \
unsigned int p_tail; \
char _pad[CK_MD_CACHELINE - sizeof(unsigned int)]; \
unsigned int size; \
unsigned int mask; \
struct type *ring; \
}; \
CK_CC_INLINE static void \
ck_ring_init_##name(struct ck_ring_##name *ring, \
struct type *buffer, \
unsigned int size) \
{ \
\
ring->size = size; \
ring->mask = size - 1; \
ring->p_tail = 0; \
ring->c_head = 0; \
ring->ring = buffer; \
return; \
} \
CK_CC_INLINE static unsigned int \
ck_ring_size_##name(struct ck_ring_##name *ring) \
{ \
unsigned int c, p; \
\
c = ck_pr_load_uint(&ring->c_head); \
p = ck_pr_load_uint(&ring->p_tail); \
return (p - c) & ring->mask; \
} \
CK_CC_INLINE static unsigned int \
ck_ring_capacity_##name(struct ck_ring_##name *ring) \
{ \
\
return ring->size; \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spsc_size_##name(struct ck_ring_##name *ring, \
struct type *entry, \
unsigned int *size) \
{ \
unsigned int consumer, producer, delta; \
unsigned int mask = ring->mask; \
\
consumer = ck_pr_load_uint(&ring->c_head); \
producer = ring->p_tail; \
delta = producer + 1; \
*size = (producer - consumer) & mask; \
\
if ((delta & mask) == (consumer & mask)) \
return false; \
\
ring->ring[producer & mask] = *entry; \
ck_pr_fence_store(); \
ck_pr_store_uint(&ring->p_tail, delta); \
return true; \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spsc_##name(struct ck_ring_##name *ring, \
struct type *entry) \
{ \
unsigned int consumer, producer, delta; \
unsigned int mask = ring->mask; \
\
consumer = ck_pr_load_uint(&ring->c_head); \
producer = ring->p_tail; \
delta = producer + 1; \
\
if ((delta & mask) == (consumer & mask)) \
return false; \
\
ring->ring[producer & mask] = *entry; \
ck_pr_fence_store(); \
ck_pr_store_uint(&ring->p_tail, delta); \
return true; \
} \
CK_CC_INLINE static bool \
ck_ring_dequeue_spsc_##name(struct ck_ring_##name *ring, \
struct type *data) \
{ \
unsigned int consumer, producer; \
unsigned int mask = ring->mask; \
\
consumer = ring->c_head; \
producer = ck_pr_load_uint(&ring->p_tail); \
\
if (consumer == producer) \
return false; \
\
ck_pr_fence_load(); \
*data = ring->ring[consumer & mask]; \
ck_pr_fence_store(); \
ck_pr_store_uint(&ring->c_head, consumer + 1); \
\
return true; \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spmc_size_##name(struct ck_ring_##name *ring, \
void *entry, unsigned int *size) \
{ \
\
return ck_ring_enqueue_spsc_size_##name(ring, entry, size); \
} \
CK_CC_INLINE static bool \
ck_ring_enqueue_spmc_##name(struct ck_ring_##name *ring, void *entry) \
{ \
\
return ck_ring_enqueue_spsc_##name(ring, entry); \
} \
CK_CC_INLINE static bool \
ck_ring_trydequeue_spmc_##name(struct ck_ring_##name *ring, \
struct type *data) \
{ \
unsigned int consumer, producer; \
unsigned int mask = ring->mask; \
\
consumer = ck_pr_load_uint(&ring->c_head); \
ck_pr_fence_load(); \
producer = ck_pr_load_uint(&ring->p_tail); \
\
if (consumer == producer) \
return false; \
\
ck_pr_fence_load(); \
*data = ring->ring[consumer & mask]; \
ck_pr_fence_memory(); \
return ck_pr_cas_uint(&ring->c_head, \
consumer, \
consumer + 1); \
} \
CK_CC_INLINE static bool \
ck_ring_dequeue_spmc_##name(struct ck_ring_##name *ring, \
struct type *data) \
{ \
unsigned int consumer, producer; \
unsigned int mask = ring->mask; \
\
consumer = ck_pr_load_uint(&ring->c_head); \
do { \
ck_pr_fence_load(); \
producer = ck_pr_load_uint(&ring->p_tail); \
\
if (consumer == producer) \
return false; \
\
ck_pr_fence_load(); \
*data = ring->ring[consumer & mask]; \
ck_pr_fence_memory(); \
} while (ck_pr_cas_uint_value(&ring->c_head, \
consumer, \
consumer + 1, \
&consumer) == false); \
\
return true; \
}
#define CK_RING_INSTANCE(name) \
struct ck_ring_##name
#define CK_RING_INIT(name, object, buffer, size) \
ck_ring_init_##name(object, buffer, size)
#define CK_RING_SIZE(name, object) \
ck_ring_size_##name(object)
#define CK_RING_CAPACITY(name, object) \
ck_ring_capacity_##name(object)
#define CK_RING_ENQUEUE_SPSC_SIZE(name, object, value, s) \
ck_ring_enqueue_spsc_size_##name(object, value, s)
#define CK_RING_ENQUEUE_SPSC(name, object, value) \
ck_ring_enqueue_spsc_##name(object, value)
#define CK_RING_DEQUEUE_SPSC(name, object, value) \
ck_ring_dequeue_spsc_##name(object, value)
#define CK_RING_DEQUEUE_SPMC(name, object, value) \
ck_ring_dequeue_spmc_##name(object, value)
#define CK_RING_TRYDEQUEUE_SPMC(name, object, value) \
ck_ring_trydequeue_spmc_##name(object, value)
#define CK_RING_ENQUEUE_SPMC_SIZE(name, object, value, s) \
ck_ring_enqueue_spmc_size_##name(object, value, s)
#define CK_RING_ENQUEUE_SPMC(name, object, value) \
ck_ring_enqueue_spmc_##name(object, value)
struct ck_ring {
unsigned int c_head;
@ -225,7 +44,6 @@ struct ck_ring {
char _pad[CK_MD_CACHELINE - sizeof(unsigned int)];
unsigned int size;
unsigned int mask;
void **ring;
};
typedef struct ck_ring ck_ring_t;
@ -246,6 +64,12 @@ ck_ring_capacity(struct ck_ring *ring)
return ring->size;
}
struct ck_ring_buffer {
void *ring;
};
typedef struct ck_ring_buffer ck_ring_buffer_t;
/*
* Atomically enqueues the specified entry. Returns true on success, returns
* false if the ck_ring is full. This operation only support one active
@ -258,12 +82,13 @@ ck_ring_capacity(struct ck_ring *ring)
* writer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
ck_ring_enqueue_spsc_size(struct ck_ring *ring, ck_ring_buffer_t buf,
void *entry,
unsigned int *size)
{
unsigned int consumer, producer, delta;
unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head);
producer = ring->p_tail;
@ -273,7 +98,7 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring,
if ((delta & mask) == (consumer & mask))
return false;
ring->ring[producer & mask] = entry;
ring_buf[producer & mask] = entry;
/*
* Make sure to update slot value before indicating
@ -291,10 +116,11 @@ ck_ring_enqueue_spsc_size(struct ck_ring *ring,
* of ck_ring_dequeue_spsc.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry)
ck_ring_enqueue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry)
{
unsigned int consumer, producer, delta;
unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head);
producer = ring->p_tail;
@ -303,7 +129,7 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry)
if ((delta & mask) == (consumer & mask))
return false;
ring->ring[producer & mask] = entry;
ring_buf[producer & mask] = entry;
/*
* Make sure to update slot value before indicating
@ -318,10 +144,11 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry)
* Single consumer and single producer ring buffer dequeue (consumer).
*/
CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring, void *data)
ck_ring_dequeue_spsc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
{
unsigned int consumer, producer;
unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ring->c_head;
producer = ck_pr_load_uint(&ring->p_tail);
@ -342,7 +169,7 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, void *data)
* troublesome on platforms where sizeof(void *)
* is not guaranteed to be sizeof(T *).
*/
ck_pr_store_ptr(data, ring->ring[consumer & mask]);
ck_pr_store_ptr(data, ring_buf[consumer & mask]);
ck_pr_fence_store();
ck_pr_store_uint(&ring->c_head, consumer + 1);
return true;
@ -360,12 +187,12 @@ ck_ring_dequeue_spsc(struct ck_ring *ring, void *data)
* writer.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring *ring,
ck_ring_enqueue_spmc_size(struct ck_ring *ring, ck_ring_buffer_t buf,
void *entry,
unsigned int *size)
{
return ck_ring_enqueue_spsc_size(ring, entry, size);
return ck_ring_enqueue_spsc_size(ring, buf, entry, size);
}
/*
@ -375,17 +202,18 @@ ck_ring_enqueue_spmc_size(struct ck_ring *ring,
* invocations of ck_ring_dequeue_spmc.
*/
CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring *ring, void *entry)
ck_ring_enqueue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *entry)
{
return ck_ring_enqueue_spsc(ring, entry);
return ck_ring_enqueue_spsc(ring, buf, entry);
}
CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring *ring, void *data)
ck_ring_trydequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
{
unsigned int consumer, producer;
unsigned int mask = ring->mask;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head);
ck_pr_fence_load();
@ -395,18 +223,19 @@ ck_ring_trydequeue_spmc(struct ck_ring *ring, void *data)
return false;
ck_pr_fence_load();
ck_pr_store_ptr(data, ring->ring[consumer & mask]);
ck_pr_store_ptr(data, ring_buf[consumer & mask]);
ck_pr_fence_memory();
return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
}
CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring *ring, void *data)
ck_ring_dequeue_spmc(struct ck_ring *ring, ck_ring_buffer_t buf, void *data)
{
unsigned int consumer, producer;
unsigned int mask = ring->mask;
void *r;
void **ring_buf = buf.ring;
consumer = ck_pr_load_uint(&ring->c_head);
@ -430,7 +259,7 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, void *data)
* volatile load to force volatile semantics while allowing
* for r itself to remain aliased across the loop.
*/
r = ck_pr_load_ptr(&ring->ring[consumer & mask]);
r = ck_pr_load_ptr(&ring_buf[consumer & mask]);
/* Serialize load with respect to head update. */
ck_pr_fence_memory();
@ -448,15 +277,13 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, void *data)
}
CK_CC_INLINE static void
ck_ring_init(struct ck_ring *ring, void *buffer, unsigned int size)
ck_ring_init(struct ck_ring *ring, unsigned int size)
{
memset(buffer, 0, sizeof(void *) * size);
ring->size = size;
ring->mask = size - 1;
ring->p_tail = 0;
ring->c_head = 0;
ring->ring = buffer;
return;
}

@ -14,8 +14,6 @@ struct entry {
int tid;
int value;
};
CK_RING(entry, entry_ring)
static CK_RING_INSTANCE(entry_ring) ring;
int
main(int argc, char *argv[])
@ -24,6 +22,8 @@ main(int argc, char *argv[])
uint64_t s, e, e_a, d_a;
struct entry *buffer;
struct entry entry = {0, 0};
ck_ring_buffer_t buf;
ck_ring_t ring;
if (argc != 2) {
ck_error("Usage: latency <size>\n");
@ -39,26 +39,27 @@ main(int argc, char *argv[])
ck_error("ERROR: Failed to allocate buffer\n");
}
CK_RING_INIT(entry_ring, &ring, buffer, size);
ck_ring_init(&ring, size);
buf.ring = buffer;
e_a = d_a = s = e = 0;
for (r = 0; r < ITERATIONS; r++) {
for (i = 0; i < size / 4; i += 4) {
s = rdtsc();
CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry);
CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry);
CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry);
CK_RING_ENQUEUE_SPSC(entry_ring, &ring, &entry);
ck_ring_enqueue_spsc(&ring, buf, &entry);
ck_ring_enqueue_spsc(&ring, buf, &entry);
ck_ring_enqueue_spsc(&ring, buf, &entry);
ck_ring_enqueue_spsc(&ring, buf, &entry);
e = rdtsc();
}
e_a += (e - s) / 4;
for (i = 0; i < size / 4; i += 4) {
s = rdtsc();
CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry);
CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry);
CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry);
CK_RING_DEQUEUE_SPSC(entry_ring, &ring, &entry);
ck_ring_dequeue_spsc(&ring, buf, &entry);
ck_ring_dequeue_spsc(&ring, buf, &entry);
ck_ring_dequeue_spsc(&ring, buf, &entry);
ck_ring_dequeue_spsc(&ring, buf, &entry);
e = rdtsc();
}
d_a += (e - s) / 4;
@ -70,20 +71,20 @@ main(int argc, char *argv[])
for (r = 0; r < ITERATIONS; r++) {
for (i = 0; i < size / 4; i += 4) {
s = rdtsc();
CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry);
CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry);
CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry);
CK_RING_ENQUEUE_SPMC(entry_ring, &ring, &entry);
ck_ring_enqueue_spmc(&ring, buf, &entry);
ck_ring_enqueue_spmc(&ring, buf, &entry);
ck_ring_enqueue_spmc(&ring, buf, &entry);
ck_ring_enqueue_spmc(&ring, buf, &entry);
e = rdtsc();
}
e_a += (e - s) / 4;
for (i = 0; i < size / 4; i += 4) {
s = rdtsc();
CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry);
CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry);
CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry);
CK_RING_DEQUEUE_SPMC(entry_ring, &ring, &entry);
ck_ring_dequeue_spmc(&ring, buf, &entry);
ck_ring_dequeue_spmc(&ring, buf, &entry);
ck_ring_dequeue_spmc(&ring, buf, &entry);
ck_ring_dequeue_spmc(&ring, buf, &entry);
e = rdtsc();
}
d_a += (e - s) / 4;

@ -1,30 +1,21 @@
.PHONY: check clean distribution
OBJECTS=ck_ring_spsc ck_ring_spsc_template ck_ring_spmc ck_ring_spmc_template
OBJECTS=ck_ring_spsc ck_ring_spmc
SIZE=16384
CFLAGS += -g2
all: $(OBJECTS)
check: all
./ck_ring_spsc $(CORES) 1 $(SIZE)
./ck_ring_spsc_template $(CORES) 1 $(SIZE)
./ck_ring_spmc $(CORES) 1 $(SIZE)
./ck_ring_spmc_template $(CORES) 1 $(SIZE)
ck_ring_spsc_template: ck_ring_spsc_template.c ../../../include/ck_ring.h
$(CC) $(CFLAGS) -o ck_ring_spsc_template ck_ring_spsc_template.c \
../../../src/ck_barrier_centralized.c
ck_ring_spmc_template: ck_ring_spmc_template.c ../../../include/ck_ring.h
$(CC) $(CFLAGS) -o ck_ring_spmc_template ck_ring_spmc_template.c \
../../../src/ck_barrier_centralized.c
ck_ring_spsc: ck_ring_spsc.c ../../../include/ck_ring.h
$(CC) $(CFLAGS) -o ck_ring_spsc ck_ring_spsc.c \
../../../src/ck_barrier_centralized.c
ck_ring_spmc: ck_ring_spmc.c ../../../include/ck_ring.h
$(CC) $(CFLAGS) -o ck_ring_spmc ck_ring_spmc.c \
$(CC) $(CFLAGS) -g2 -o ck_ring_spmc ck_ring_spmc.c \
../../../src/ck_barrier_centralized.c
clean:

@ -43,6 +43,7 @@ struct context {
unsigned int tid;
unsigned int previous;
unsigned int next;
void *buffer;
};
struct entry {
@ -60,6 +61,7 @@ static struct affinity a;
static int size;
static int eb;
static ck_barrier_centralized_t barrier = CK_BARRIER_CENTRALIZED_INITIALIZER;
static struct context *_context;
static void *
test_spmc(void *c)
@ -68,8 +70,10 @@ test_spmc(void *c)
unsigned long previous = 0;
unsigned int seed;
int i, k, j, tid;
struct context *context = c;
ck_ring_buffer_t buf;
(void)c;
buf.ring = context->buffer;
if (aff_iterate(&a)) {
perror("ERROR: Could not affine thread");
exit(EXIT_FAILURE);
@ -86,9 +90,11 @@ test_spmc(void *c)
/* Keep trying until we encounter at least one node. */
if (j & 1) {
while (ck_ring_dequeue_spmc(&ring_spmc, &o) == false);
while (ck_ring_dequeue_spmc(&ring_spmc, buf,
&o) == false);
} else {
while (ck_ring_trydequeue_spmc(&ring_spmc, &o) == false);
while (ck_ring_trydequeue_spmc(&ring_spmc, buf,
&o) == false);
}
observed++;
@ -132,9 +138,12 @@ test(void *c)
unsigned int s;
int i, j;
bool r;
ck_ring_buffer_t buf;
ck_barrier_centralized_state_t sense =
CK_BARRIER_CENTRALIZED_STATE_INITIALIZER;
buf.ring = context->buffer;
if (aff_iterate(&a)) {
perror("ERROR: Could not affine thread");
exit(EXIT_FAILURE);
@ -156,9 +165,10 @@ test(void *c)
entries[i].tid = 0;
if (i & 1) {
r = ck_ring_enqueue_spmc(ring, entries + i);
r = ck_ring_enqueue_spmc(ring, buf,
entries + i);
} else {
r = ck_ring_enqueue_spmc_size(ring,
r = ck_ring_enqueue_spmc_size(ring, buf,
entries + i, &s);
if ((int)s != i) {
@ -188,7 +198,9 @@ test(void *c)
for (i = 0; i < ITERATIONS; i++) {
for (j = 0; j < size; j++) {
while (ck_ring_dequeue_spmc(ring + context->previous, &entry) == false);
buf.ring = _context[context->previous].buffer;
while (ck_ring_dequeue_spmc(ring + context->previous,
buf, &entry) == false);
if (context->previous != (unsigned int)entry->tid) {
ck_error("[%u:%p] %u != %u\n",
@ -201,13 +213,14 @@ test(void *c)
}
entry->tid = context->tid;
buf.ring = context->buffer;
if (i & 1) {
r = ck_ring_enqueue_spmc(ring + context->tid,
entry);
buf, entry);
} else {
r = ck_ring_enqueue_spmc_size(ring + context->tid,
entry, &s);
buf, entry, &s);
if ((int)s >= size) {
ck_error("Size %u out of range of %d\n",
@ -227,8 +240,8 @@ main(int argc, char *argv[])
int i, r;
void *buffer;
unsigned long l;
struct context *context;
pthread_t *thread;
ck_ring_buffer_t buf;
if (argc != 4) {
ck_error("Usage: validate <threads> <affinity delta> <size>\n");
@ -247,31 +260,32 @@ main(int argc, char *argv[])
ring = malloc(sizeof(ck_ring_t) * nthr);
assert(ring);
context = malloc(sizeof(*context) * nthr);
assert(context);
_context = malloc(sizeof(*_context) * nthr);
assert(_context);
thread = malloc(sizeof(pthread_t) * nthr);
assert(thread);
fprintf(stderr, "SPSC test:");
for (i = 0; i < nthr; i++) {
context[i].tid = i;
_context[i].tid = i;
if (i == 0) {
context[i].previous = nthr - 1;
context[i].next = i + 1;
_context[i].previous = nthr - 1;
_context[i].next = i + 1;
} else if (i == nthr - 1) {
context[i].next = 0;
context[i].previous = i - 1;
_context[i].next = 0;
_context[i].previous = i - 1;
} else {
context[i].next = i + 1;
context[i].previous = i - 1;
_context[i].next = i + 1;
_context[i].previous = i - 1;
}
buffer = malloc(sizeof(void *) * (size + 1));
assert(buffer);
memset(buffer, 0, sizeof(void *) * (size + 1));
ck_ring_init(ring + i, buffer, size + 1);
r = pthread_create(thread + i, NULL, test, context + i);
_context[i].buffer = buffer;
ck_ring_init(ring + i, size + 1);
r = pthread_create(thread + i, NULL, test, _context + i);
assert(r == 0);
}
@ -284,9 +298,11 @@ main(int argc, char *argv[])
buffer = malloc(sizeof(void *) * (size + 1));
assert(buffer);
memset(buffer, 0, sizeof(void *) * (size + 1));
ck_ring_init(&ring_spmc, buffer, size + 1);
ck_ring_init(&ring_spmc, size + 1);
buf.ring = buffer;
for (i = 0; i < nthr - 1; i++) {
r = pthread_create(thread + i, NULL, test_spmc, context + i);
_context[i].buffer = buffer;
r = pthread_create(thread + i, NULL, test_spmc, _context + i);
assert(r == 0);
}
@ -302,13 +318,14 @@ main(int argc, char *argv[])
/* Wait until queue is not full. */
if (l & 1) {
while (ck_ring_enqueue_spmc(&ring_spmc, entry) == false)
while (ck_ring_enqueue_spmc(&ring_spmc, buf,
entry) == false)
ck_pr_stall();
} else {
unsigned int s;
while (ck_ring_enqueue_spmc_size(&ring_spmc,
entry, &s) == false) {
buf, entry, &s) == false) {
ck_pr_stall();
}

@ -41,6 +41,7 @@ struct context {
unsigned int tid;
unsigned int previous;
unsigned int next;
void *buffer;
};
struct entry {
@ -53,6 +54,7 @@ static ck_ring_t *ring;
static struct affinity a;
static int size;
static ck_barrier_centralized_t barrier = CK_BARRIER_CENTRALIZED_INITIALIZER;
static struct context *_context;
static void *
test(void *c)
@ -64,12 +66,14 @@ test(void *c)
bool r;
ck_barrier_centralized_state_t sense =
CK_BARRIER_CENTRALIZED_STATE_INITIALIZER;
ck_ring_buffer_t buf;
if (aff_iterate(&a)) {
perror("ERROR: Could not affine thread");
exit(EXIT_FAILURE);
}
buf.ring = context->buffer;
if (context->tid == 0) {
struct entry *entries;
@ -86,10 +90,11 @@ test(void *c)
entries[i].tid = 0;
if (i & 1) {
r = ck_ring_enqueue_spsc(ring, entries + i);
r = ck_ring_enqueue_spsc(ring, buf,
entries + i);
} else {
r = ck_ring_enqueue_spsc_size(ring,
entries + i, &s);
buf, entries + i, &s);
if ((int)s != i) {
ck_error("Size is %u, expected %d\n",
@ -115,7 +120,9 @@ test(void *c)
for (i = 0; i < ITERATIONS; i++) {
for (j = 0; j < size; j++) {
while (ck_ring_dequeue_spsc(ring + context->previous, &entry) == false);
buf.ring = _context[context->previous].buffer;
while (ck_ring_dequeue_spsc(ring + context->previous,
buf, &entry) == false);
if (context->previous != (unsigned int)entry->tid) {
ck_error("[%u:%p] %u != %u\n",
@ -128,12 +135,13 @@ test(void *c)
}
entry->tid = context->tid;
buf.ring = context->buffer;
if (i & 1) {
r = ck_ring_enqueue_spsc(ring + context->tid,
entry);
buf, entry);
} else {
r = ck_ring_enqueue_spsc_size(ring +
context->tid, entry, &s);
context->tid, buf, entry, &s);
if ((int)s >= size) {
ck_error("Size %u is out of range %d\n",
@ -152,7 +160,6 @@ main(int argc, char *argv[])
{
int i, r;
void *buffer;
struct context *context;
pthread_t *thread;
if (argc != 4) {
@ -172,29 +179,30 @@ main(int argc, char *argv[])
ring = malloc(sizeof(ck_ring_t) * nthr);
assert(ring);
context = malloc(sizeof(*context) * nthr);
assert(context);
_context = malloc(sizeof(*_context) * nthr);
assert(_context);
thread = malloc(sizeof(pthread_t) * nthr);
assert(thread);
for (i = 0; i < nthr; i++) {
context[i].tid = i;
_context[i].tid = i;
if (i == 0) {
context[i].previous = nthr - 1;
context[i].next = i + 1;
_context[i].previous = nthr - 1;
_context[i].next = i + 1;
} else if (i == nthr - 1) {
context[i].next = 0;
context[i].previous = i - 1;
_context[i].next = 0;
_context[i].previous = i - 1;
} else {
context[i].next = i + 1;
context[i].previous = i - 1;
_context[i].next = i + 1;
_context[i].previous = i - 1;
}
buffer = malloc(sizeof(void *) * (size + 1));
assert(buffer);
ck_ring_init(ring + i, buffer, size + 1);
r = pthread_create(thread + i, NULL, test, context + i);
_context[i].buffer = buffer;
ck_ring_init(ring + i, size + 1);
r = pthread_create(thread + i, NULL, test, _context + i);
assert(r == 0);
}

Loading…
Cancel
Save