ck_bag: Various bug fixes.

Add necessary load fence to iterator.
Initialize iterator appropriately for empty bags.
Improve unit test.
Fix bag linkage bug for non x86_64 targets.
Fix block accounting on removal.
ck_pring
Abel Mathew 13 years ago
parent f1293b379b
commit eaa8ad1d4d

@ -31,6 +31,7 @@
#include <ck_cc.h>
#include <ck_pr.h>
#include <ck_malloc.h>
#include <ck_stdint.h>
#include <stdbool.h>
#include <stddef.h>
@ -53,7 +54,7 @@
/*
* Growth strategies
* Bag growth strategies.
*/
enum ck_bag_allocation_strategy {
CK_BAG_ALLOCATE_GEOMETRIC = 0,
@ -61,9 +62,9 @@ enum ck_bag_allocation_strategy {
};
/*
* max: max n_entries per block
* bytes: sizeof(ck_bag_block) + sizeof(flex. array member)
* + inline allocator overhead
* max: max n_entries per block
* bytes: sizeof(ck_bag_block) + sizeof(flex. array member)
* + inline allocator overhead
*/
struct ck_bag_block_info {
size_t max;
@ -92,12 +93,9 @@ struct ck_bag {
struct ck_bag_block *head;
struct ck_bag_block *avail_head;
struct ck_bag_block *avail_tail;
unsigned int n_entries;
unsigned int n_blocks;
enum ck_bag_allocation_strategy alloc_strat;
struct ck_bag_block_info info;
};
typedef struct ck_bag ck_bag_t;
@ -109,7 +107,9 @@ struct ck_bag_iterator {
};
typedef struct ck_bag_iterator ck_bag_iterator_t;
#ifdef __x86_64__
#define CK_BAG_BLOCK_ENTRIES_MASK ((uintptr_t)0xFFFF << 48)
#endif
CK_CC_INLINE static struct ck_bag_block *
ck_bag_block_next(struct ck_bag_block *block)
@ -144,13 +144,10 @@ CK_CC_INLINE static void
ck_bag_iterator_init(ck_bag_iterator_t *iterator, ck_bag_t *bag)
{
iterator->index = 0;
iterator->block = ck_pr_load_ptr(&bag->head);
if (iterator->block == NULL) {
iterator->n_entries = 0;
} else {
iterator->index = 0;
if (iterator->block != NULL)
iterator->n_entries = ck_bag_block_count(iterator->block);
}
return;
}
@ -158,7 +155,6 @@ ck_bag_iterator_init(ck_bag_iterator_t *iterator, ck_bag_t *bag)
CK_CC_INLINE static bool
ck_bag_next(struct ck_bag_iterator *iterator, void **entry)
{
uint16_t n_entries;
struct ck_bag_block *next;
if (iterator->block == NULL)
@ -167,20 +163,17 @@ ck_bag_next(struct ck_bag_iterator *iterator, void **entry)
if (iterator->index >= iterator->n_entries) {
next = ck_pr_load_ptr(&iterator->block->next.ptr);
iterator->block = ck_bag_block_next(next);
n_entries = (iterator->block != NULL) ?
ck_bag_block_count(iterator->block) : 0;
if (iterator->block == NULL)
return false;
if (n_entries == 0)
iterator->n_entries = ck_bag_block_count(iterator->block);
if (iterator->n_entries == 0)
return false;
ck_pr_fence_load();
iterator->index = 0;
iterator->n_entries = n_entries;
ck_pr_fence_load();
}
if (iterator->block == NULL)
return false;
*entry = ck_pr_load_ptr(&iterator->block->array[iterator->index++]);
return true;
}

@ -35,7 +35,6 @@
#include "../../common.h"
#define NUM_READER_THREADS 8
#define WRITER_MAX (1 << 17)
#define READ_LATENCY 8
static ck_bag_t bag;
@ -43,6 +42,7 @@ static ck_epoch_t epoch_bag;
static ck_epoch_record_t epoch_wr;
static int leave;
static unsigned int barrier;
static unsigned int writer_max = 131072;
struct bag_epoch {
ck_epoch_entry_t epoch_entry;
@ -106,11 +106,12 @@ reader(void *arg)
* guarantee across the bag.
*/
for (;;) {
ck_epoch_read_begin(&epoch_record);
ck_bag_iterator_init(&iterator, &bag);
curr_max = prev_max = prev = -1;
block = NULL;
while (ck_bag_next(&iterator, &curr_ptr)) {
while (ck_bag_next(&iterator, &curr_ptr) == true) {
if (block != iterator.block) {
prev = -1;
curr = 0;
@ -122,11 +123,13 @@ reader(void *arg)
curr = (uintptr_t)(curr_ptr);
if (curr < prev) {
/* Ascending order within block violated */
fprintf(stderr, "%p: %p: %ju\n", (void *)&epoch_record, (void *)iterator.block, (uintmax_t)curr);
fprintf(stderr, "ERROR: %ju < %ju \n",
(uintmax_t)curr, (uintmax_t)prev);
exit(EXIT_FAILURE);
} else if (prev_max != -1 && curr > prev_max) {
/* Max of prev block > max of current block */
fprintf(stderr, "%p: %p: %ju\n", (void *)&epoch_record, (void *)iterator.block, (uintmax_t)curr);
fprintf(stderr, "ERROR: %ju > prev_max: %ju\n",
(uintmax_t)curr, (uintmax_t)prev_max);
exit(EXIT_FAILURE);
@ -138,6 +141,8 @@ reader(void *arg)
n_entries++;
}
ck_epoch_read_end(&epoch_record);
iterations++;
if (ck_pr_load_int(&leave) == 1)
break;
@ -146,7 +151,7 @@ reader(void *arg)
fprintf(stderr, "Read %llu entries in %llu iterations.\n", n_entries, iterations);
ck_pr_inc_uint(&barrier);
while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS)
while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS + 1)
ck_pr_stall();
return NULL;
@ -157,36 +162,57 @@ static void *
writer_thread(void *unused)
{
unsigned int i;
unsigned int iteration = 0;
(void)unused;
for (;;) {
for (i = 0; i < WRITER_MAX; i++)
ck_bag_put_spmc(&bag, (void *)(uintptr_t)i);
iteration++;
ck_epoch_write_begin(&epoch_wr);
for (i = 1; i <= writer_max; i++) {
if (ck_bag_put_spmc(&bag, (void *)(uintptr_t)i) == false) {
perror("ck_bag_put_spmc");
exit(EXIT_FAILURE);
}
if (ck_bag_member_spmc(&bag, (void *)(uintptr_t)i) == false) {
fprintf(stderr, "ck_bag_put_spmc [%u]: %u\n", iteration, i);
exit(EXIT_FAILURE);
}
}
if (ck_pr_load_int(&leave) == 1)
break;
for (i = 0; i < WRITER_MAX >> 1; i++) {
//fprintf(stderr, "set: %d", iteration);
for (i = 1; i < writer_max; i++) {
void *replace = (void *)(uintptr_t)i;
if (ck_bag_set_spmc(&bag, (void *)(uintptr_t)i, replace) == false ||
replace != (void *)(uintptr_t)i) {
if (ck_bag_set_spmc(&bag, (void *)(uintptr_t)i, replace) == false) {
fprintf(stderr, "ERROR: set %ju != %ju",
(uintmax_t)(uintptr_t)replace, (uintmax_t)i);
exit(EXIT_FAILURE);
}
}
if (ck_pr_load_int(&leave) == 1)
break;
for (i = writer_max; i > 0; i--) {
if (ck_bag_member_spmc(&bag, (void *)(uintptr_t)i) == false) {
fprintf(stderr, "ck_bag_member_spmc [%u]: %u\n", iteration, i);
exit(EXIT_FAILURE);
}
if (ck_bag_remove_spmc(&bag, (void *)(uintptr_t)i) == false) {
fprintf(stderr, "ck_bag_remove_spmc [%u]: %u\n", iteration, i);
exit(EXIT_FAILURE);
}
}
for (i = WRITER_MAX; i > 1; i--)
ck_bag_remove_spmc(&bag, (void *)((uintptr_t)i - 1));
ck_epoch_write_end(&epoch_wr);
}
while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS)
ck_pr_stall();
ck_pr_inc_uint(&barrier);
return NULL;
}
@ -200,7 +226,7 @@ main(int argc, char **argv)
ck_bag_iterator_t bag_it;
size_t b = CK_BAG_DEFAULT;
if (argc == 2) {
if (argc >= 2) {
int r = atoi(argv[1]);
if (r <= 0) {
fprintf(stderr, "# entries in block must be > 0\n");
@ -210,21 +236,46 @@ main(int argc, char **argv)
b = (size_t)r;
}
if (argc >= 3) {
int r = atoi(argv[2]);
if (r < 16) {
fprintf(stderr, "# entries must be >= 16\n");
exit(EXIT_FAILURE);
}
writer_max = (unsigned int)r;
}
ck_epoch_init(&epoch_bag, 100);
ck_epoch_register(&epoch_bag, &epoch_wr);
ck_bag_allocator_set(&allocator, sizeof(struct bag_epoch));
ck_bag_init(&bag, b, CK_BAG_ALLOCATE_GEOMETRIC);
fprintf(stderr, "Configuration: %zu bytes/block, %zu entries/block\n", bag.info.bytes, bag.info.max);
fprintf(stderr, "Configuration: %u entries, %zu bytes/block, %zu entries/block\n", writer_max, bag.info.bytes, bag.info.max);
i = 1;
/* Basic Test */
ck_bag_put_spmc(&bag, (void *)(uintptr_t)i);
ck_bag_remove_spmc(&bag, (void *)(uintptr_t)i);
ck_bag_put_spmc(&bag, (void *)(uintptr_t)i);
/* Sequential test */
for (i = 0; i < 10; i++)
for (i = 1; i <= 10; i++)
ck_bag_put_spmc(&bag, (void *)(uintptr_t)i);
for (i = 1; i <= 10; i++)
ck_bag_remove_spmc(&bag, (void *)(uintptr_t)i);
for (i = 10; i > 0; i--)
ck_bag_remove_spmc(&bag, (void *)(uintptr_t)i);
for (i = 1; i <= 10; i++)
ck_bag_put_spmc(&bag, (void *)(uintptr_t)i);
ck_bag_iterator_init(&bag_it, &bag);
while (ck_bag_next(&bag_it, &curr_ptr)) {
curr = (uintptr_t)(curr_ptr);
if (curr > (uintptr_t)i)
fprintf(stderr, "ERROR: %ju != %u", (uintmax_t)curr, i);
fprintf(stderr, "ERROR: %ju != %u\n", (uintmax_t)curr, i);
ck_bag_remove_spmc(&bag, curr_ptr);
}
@ -236,14 +287,14 @@ main(int argc, char **argv)
pthread_create(&readers[i], NULL, reader, NULL);
}
sleep(20);
sleep(10);
ck_pr_store_int(&leave, 1);
for (i = 0; i < NUM_READER_THREADS; i++)
pthread_join(readers[i], NULL);
pthread_join(writer, NULL);
fprintf(stderr, "Current Entries: %u\n", ck_bag_count(&bag));
fprintf(stderr, "Current entries: %u\n", ck_bag_count(&bag));
ck_bag_destroy(&bag);
return 0;
}

@ -34,10 +34,7 @@
#include <stdbool.h>
#include <string.h>
#ifndef CK_BAG_PAGESIZE
#define CK_BAG_PAGESIZE CK_MD_PAGESIZE
#endif
#define CK_BAG_MAX_N_ENTRIES (1 << 12)
static struct ck_malloc allocator;
@ -67,11 +64,9 @@ ck_bag_init(struct ck_bag *bag,
block_overhead = sizeof(struct ck_bag_block) + allocator_overhead;
if (n_block_entries == CK_BAG_DEFAULT) {
bag->info.max = (CK_BAG_PAGESIZE - block_overhead) / sizeof(void *);
bag->info.max = ((CK_BAG_PAGESIZE - block_overhead) / sizeof(void *));
} else {
bag->info.max = (n_block_entries * CK_MD_CACHELINE - block_overhead) / sizeof(void *);
if (n_block_entries > bag->info.max)
bag->info.max = n_block_entries;
bag->info.max = ((n_block_entries * CK_MD_CACHELINE - block_overhead) / sizeof(void *));
}
bag->info.bytes = block_overhead + sizeof(void *) * bag->info.max;
@ -111,7 +106,7 @@ ck_bag_put_spmc(struct ck_bag *bag, void *entry)
struct ck_bag_block *cursor, *new_block, *new_block_prev, *new_tail;
uint16_t n_entries_block;
size_t blocks_alloc, i;
uintptr_t next;
uintptr_t next = 0;
new_block = new_block_prev = new_tail = NULL;
@ -142,6 +137,10 @@ ck_bag_put_spmc(struct ck_bag *bag, void *entry)
if (i == 0)
new_tail = new_block;
#ifndef __x86_64__
new_block->next.n_entries = 0;
#endif
new_block->next.ptr = new_block_prev;
new_block->avail_next = new_block_prev;
if (new_block_prev != NULL)
@ -162,7 +161,7 @@ ck_bag_put_spmc(struct ck_bag *bag, void *entry)
bag->n_blocks += blocks_alloc; /* n_blocks and n_avail_blocks? */
}
/* Update the Available List */
/* Update the available list */
if (new_block != NULL) {
if (bag->avail_tail != NULL) {
cursor->avail_prev = bag->avail_tail;
@ -173,56 +172,59 @@ ck_bag_put_spmc(struct ck_bag *bag, void *entry)
}
bag->avail_tail = new_tail;
} else if (n_entries_block == bag->info.max - 1) {
/* New entry will fill up block, remove from avail list */
if (cursor->avail_prev != NULL)
cursor->avail_prev->avail_next = cursor->avail_next;
} else if (n_entries_block == bag->info.max-1) {
/* New entry will fill up block, remove from avail list */
if (cursor->avail_prev != NULL)
cursor->avail_prev->avail_next = cursor->avail_next;
if (cursor->avail_next != NULL)
cursor->avail_next->avail_prev = cursor->avail_prev;
if (cursor->avail_next != NULL)
cursor->avail_next->avail_prev = cursor->avail_prev;
if (bag->avail_head == cursor)
bag->avail_head = cursor->avail_next;
if (bag->avail_head == cursor)
bag->avail_head = cursor->avail_next;
if (bag->avail_tail == cursor)
bag->avail_tail = cursor->avail_prev;
if (bag->avail_tail == cursor)
bag->avail_tail = cursor->avail_prev;
/* For debugging purposes */
cursor->avail_next = NULL;
cursor->avail_prev = NULL;
/* For debugging purposes */
cursor->avail_next = NULL;
cursor->avail_prev = NULL;
}
/* Update array and block->n_entries */
cursor->array[n_entries_block++] = entry;
ck_pr_fence_store();
#ifdef __x86_64__
next = ((uintptr_t)n_entries_block << 48);
#endif
ck_pr_fence_store();
/* Update bag's list */
if (n_entries_block == 1) {
if (bag->head != NULL) {
#ifdef __x86_64__
if (bag->head != NULL)
next += ((uintptr_t)(void *)ck_bag_block_next(bag->head));
#else
next = (uintptr_t)(void *)ck_bag_block_next(bag->head);
#endif
}
#ifndef __x86_64__
ck_pr_store_ptr(&cursor->next.n_entries, (void *)(uintptr_t)n_entries_block);
if (bag->head != NULL)
next = (uintptr_t)ck_bag_block_next(bag->head->next.ptr);
else
next = 0;
#endif
ck_pr_store_ptr(&cursor->next.ptr, (void *)next);
ck_pr_store_ptr(&bag->head, cursor);
} else {
#ifdef __x86_64__
next += ((uintptr_t)(void *)ck_bag_block_next(cursor->next.ptr));
ck_pr_store_ptr(&cursor->next, (void *)next);
#else
ck_pr_store_ptr(&cursor->next.n_entries, (void *)(uintptr_t)n_entries_block);
#endif
}
ck_pr_store_uint(&bag->n_entries, bag->n_entries + 1);
@ -230,10 +232,7 @@ ck_bag_put_spmc(struct ck_bag *bag, void *entry)
}
/*
* Set
* Replace prev_entry with new entry if exists, otherwise insert into bag
*
* On return, new_entry = prev_entry on replacement, NULL on insertion.
* Replace prev_entry with new entry if exists, otherwise insert into bag.
*/
bool
ck_bag_set_spmc(struct ck_bag *bag, void *compare, void *update)
@ -269,11 +268,13 @@ ck_bag_remove_spmc(struct ck_bag *bag, void *entry)
prev = NULL;
while (cursor != NULL) {
n_entries = ck_bag_block_count(cursor);
for (block_index = 0; block_index < n_entries; block_index++) {
if (cursor->array[block_index] == entry)
goto found;
}
prev = cursor;
cursor = ck_bag_block_next(cursor->next.ptr);
}
@ -293,7 +294,7 @@ found:
next = ((uintptr_t)prev->next.ptr & (CK_BAG_BLOCK_ENTRIES_MASK)) |
(uintptr_t)(void *)ck_bag_block_next(cursor->next.ptr);
#else
next = (uintptr_t)cursor->next.ptr;
next = (uintptr_t)(void *)cursor->next.ptr;
#endif
ck_pr_store_ptr(&prev->next.ptr, (struct ck_bag_block *)next);
}
@ -305,27 +306,28 @@ found:
if (cursor->avail_next != NULL)
cursor->avail_next->avail_prev = cursor->avail_prev;
bag->n_blocks--;
copy = cursor->avail_next;
} else {
uintptr_t next_ptr;
copy = allocator.malloc(bag->info.bytes);
copy = allocator.malloc(bag->info.bytes);
if (copy == NULL)
return false;
memcpy(copy, cursor, bag->info.bytes);
copy->array[block_index] = copy->array[--n_entries];
ck_pr_fence_store();
next_ptr = (uintptr_t)(void *)ck_bag_block_next(copy->next.ptr);
#ifdef __x86_64__
copy->next.ptr = (void *)(((uintptr_t)n_entries << 48) | next_ptr);
#else
copy->next.n_entries = n_entries;
copy->next.ptr = (void *)next_ptr;
copy->next.ptr = (struct ck_bag_block *)next_ptr;
#endif
ck_pr_fence_store();
if (prev == NULL) {
ck_pr_store_ptr(&bag->head, copy);
} else {

Loading…
Cancel
Save