diff --git a/include/ck_bag.h b/include/ck_bag.h new file mode 100644 index 0000000..bec60c4 --- /dev/null +++ b/include/ck_bag.h @@ -0,0 +1,171 @@ +/* + * Copyright 2012 Abel P. Mathew + * Copyright 2012 Samy Al Bahra + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#ifndef _CK_BAG_H +#define _CK_BAG_H + +#include +#include +#include +#include +#include + +/* + * ck_bag is a lock-free spmc linked list of blocks. + * + * A block consists of: + * next: + * Linkage for bag linked list. + * avail_next, avail_prev: + * Linkage for bag's available linked list (to support 0(1) inserts). + * array: + * flexible array member. + * + * The top 3 bytes of "next" contain the # of entries within block->array. + * + * Valid entries in block->array are contigious and stored at the front + * of the array. Empty entries are stored at the back. + */ + + +/* + * Memory Allocation Strategies: + * GEOMETRIC = allocate # of existing blocks * 2. + * LINEAR = only allocate a single block. + */ +enum ck_bag_allocation_strategy { + CK_BAG_ALLOCATE_GEOMETRIC = 0, + CK_BAG_ALLOCATE_LINEAR +}; + +/* + * max: max n_entries per block + * bytes: sizeof(ck_bag_block) + sizeof(flex. array member) + * + inline allocator overhead + */ +struct ck_bag_block_info { + size_t max; + size_t bytes; +}; + +#define CK_BAG_DEFAULT 0 + +struct ck_bag_block { + struct ck_bag_block *next; + struct ck_bag_block *avail_next; + struct ck_bag_block *avail_prev; + + void *array[]; +} CK_CC_CACHELINE; + +struct ck_bag { + struct ck_bag_block *head; + struct ck_bag_block *avail_head; + struct ck_bag_block *avail_tail; + + unsigned int n_entries; + unsigned int n_blocks; + + enum ck_bag_allocation_strategy alloc_strat; + + struct ck_bag_block_info info; +}; +typedef struct ck_bag ck_bag_t; + +struct ck_bag_iterator { + struct ck_bag_block *block; + uint16_t index; +}; +typedef struct ck_bag_iterator ck_bag_iterator_t; + +#define CK_BAG_BLOCK_ENTRIES_MASK ((uintptr_t)0xFFFF << 48) + +CK_CC_INLINE static struct ck_bag_block * +ck_bag_block_next(struct ck_bag_block *block) +{ + + return (struct ck_bag_block *)((uintptr_t)block & ~CK_BAG_BLOCK_ENTRIES_MASK); +} + +CK_CC_INLINE static unsigned int +ck_bag_count(struct ck_bag *bag) +{ + + return ck_pr_load_uint(&bag->n_entries); +} + +CK_CC_INLINE static uint16_t +ck_bag_block_count(struct ck_bag_block *block) +{ + + return (uintptr_t)ck_pr_load_ptr(&block->next) >> 48; +} + +CK_CC_INLINE static void +ck_bag_iterator_init(ck_bag_iterator_t *iterator, ck_bag_t *bag) +{ + + iterator->block = ck_pr_load_ptr(&bag->head); + iterator->index = 0; + return; +} + +CK_CC_INLINE static bool +ck_bag_next(struct ck_bag_iterator *iterator, void **entry) +{ + uint16_t n_entries; + struct ck_bag_block *next; + + if (iterator->block == NULL) + return NULL; + + n_entries = ck_bag_block_count(iterator->block); + if (iterator->index >= n_entries) { + next = ck_pr_load_ptr(&iterator->block->next); + iterator->block = ck_bag_block_next(next); + if (iterator->block == NULL || ck_bag_block_count(iterator->block) == 0) + return false; + + iterator->index = 0; + } + + if (iterator->block == NULL) + return false; + + *entry = ck_pr_load_ptr(&iterator->block->array[iterator->index++]); + return true; +} + +bool ck_bag_init(struct ck_bag *, size_t, enum ck_bag_allocation_strategy); +bool ck_bag_allocator_set(struct ck_malloc *, size_t); +void ck_bag_destroy(ck_bag_t *); +bool ck_bag_put_spmc(ck_bag_t *, void *); +bool ck_bag_set_spmc(struct ck_bag *, void *, void *); +bool ck_bag_remove_spmc(ck_bag_t *, void *); +bool ck_bag_member_spmc(ck_bag_t *, void *); + +#endif /* CK_BAG_H */ diff --git a/include/ck_md.h b/include/ck_md.h index 173e096..33c5a69 100644 --- a/include/ck_md.h +++ b/include/ck_md.h @@ -31,4 +31,8 @@ #define CK_MD_CACHELINE (64) #endif +#ifndef CK_MD_PAGE_SIZE +#define CK_MD_PAGE_SIZE (4096) +#endif + #endif /* _CK_MD_H */ diff --git a/regressions/ck_bag/validate/Makefile b/regressions/ck_bag/validate/Makefile new file mode 100644 index 0000000..563566f --- /dev/null +++ b/regressions/ck_bag/validate/Makefile @@ -0,0 +1,14 @@ +.PHONY: clean distribution + +OBJECTS=order + +all: $(OBJECTS) + +order: order.c ../../../include/ck_bag.h ../../../src/ck_bag.c + $(CC) $(CFLAGS) -o order order.c ../../../src/ck_bag.c + +clean: + rm -rf *.dSYM *~ *.o $(OBJECTS) + +include ../../../build/regressions.build +CFLAGS+=$(PTHREAD_CFLAGS) -D_GNU_SOURCE diff --git a/regressions/ck_bag/validate/order.c b/regressions/ck_bag/validate/order.c new file mode 100644 index 0000000..9625cee --- /dev/null +++ b/regressions/ck_bag/validate/order.c @@ -0,0 +1,218 @@ +/* + * Copyright 2012 Abel P. Mathew + * Copyright 2012 Samy Al Bahra + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include + +#include "../../common.h" + +#define NUM_READER_THREADS 8 +#define WRITER_MAX (1 << 17) +#define READ_LATENCY 8 + +static ck_bag_t bag; +static int leave; +static unsigned int barrier; + +static void * +bag_malloc(size_t r) +{ + + return malloc(r); +} + +static void +bag_free(void *p, size_t b, bool r) +{ + + (void)p; + (void)b; + (void)r; +// free(p); + return; +} + +static struct ck_malloc allocator = { + .malloc = bag_malloc, + .free = bag_free +}; + +static void * +reader(void *arg) +{ + void *curr_ptr; + intptr_t curr, prev, curr_max, prev_max; + unsigned long long n_entries = 0, iterations = 0; + + (void)arg; + + ck_bag_iterator_t iterator; + struct ck_bag_block *block = NULL; + + /* + * Check if entries within a block are sequential. Since ck_bag inserts + * newly occupied blocks at the beginning of the list, there is no ordering + * guarantee across the bag. + */ + for (;;) { + ck_bag_iterator_init(&iterator, &bag); + curr_max = prev_max = prev = -1; + + while (ck_bag_next(&iterator, &curr_ptr)) { + + if (block != iterator.block) { + prev = -1; + curr = 0; + prev_max = curr_max; + curr_max = 0; + block = iterator.block; + } + + curr = (uintptr_t)(curr_ptr); + if (curr < prev) { + /* Ascending order within block violated */ + fprintf(stderr, "ERROR: %ju < %ju \n", + (uintmax_t)curr, (uintmax_t)prev); + exit(EXIT_FAILURE); + } else if (prev_max != -1 && curr > prev_max) { + /* Max of prev block > max of current block */ + fprintf(stderr, "ERROR: %ju > prev_max: %ju\n", + (uintmax_t)curr, (uintmax_t)prev_max); + exit(EXIT_FAILURE); + } + + curr_max = curr; + + prev = curr; + n_entries++; + } + + iterations++; + if (ck_pr_load_int(&leave) == 1) + break; + } + + fprintf(stderr, "Read %llu entries in %llu iterations.\n", n_entries, iterations); + + ck_pr_inc_uint(&barrier); + while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS) + ck_pr_stall(); + + return NULL; + +} + +static void * +writer_thread(void *unused) +{ + unsigned int i; + + (void)unused; + + for (;;) { + for (i = 0; i < WRITER_MAX; i++) + ck_bag_put_spmc(&bag, (void *)(uintptr_t)i); + + if (ck_pr_load_int(&leave) == 1) + break; + + for (i = 0; i < WRITER_MAX >> 1; i++) { + void *replace = (void *)(uintptr_t)i; + if (ck_bag_set_spmc(&bag, (void *)(uintptr_t)i, replace) == false || + replace != (void *)(uintptr_t)i) { + fprintf(stderr, "ERROR: set %ju != %ju", + (uintmax_t)(uintptr_t)replace, (uintmax_t)i); + exit(EXIT_FAILURE); + } + } + + if (ck_pr_load_int(&leave) == 1) + break; + + for (i = WRITER_MAX; i > 1; i--) + ck_bag_remove_spmc(&bag, (void *)((uintptr_t)i - 1)); + } + + while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS) + ck_pr_stall(); + + return NULL; +} + +int +main(int argc, char **argv) +{ + pthread_t *readers; + pthread_t writer; + unsigned int i, curr; + void *curr_ptr; + ck_bag_iterator_t bag_it; + + (void)argc; + (void)argv; + + ck_bag_allocator_set(&allocator, 0); + ck_bag_init(&bag, CK_BAG_DEFAULT, CK_BAG_ALLOCATE_GEOMETRIC); + + fprintf(stderr, "Block Size: %zuB\n", bag.info.bytes); + + /* Sequential test */ + for (i = 0; i < 10; i++) + ck_bag_put_spmc(&bag, (void *)(uintptr_t)i); + + ck_bag_iterator_init(&bag_it, &bag); + while (ck_bag_next(&bag_it, &curr_ptr)) { + curr = (uintptr_t)(curr_ptr); + if (curr > (uintptr_t)i) + fprintf(stderr, "ERROR: %ju != %u", (uintmax_t)curr, i); + + ck_bag_remove_spmc(&bag, curr_ptr); + } + + /* Concurrent test */ + pthread_create(&writer, NULL, writer_thread, NULL); + readers = malloc(sizeof(pthread_t) * NUM_READER_THREADS); + for (i = 0; i < NUM_READER_THREADS; i++) { + pthread_create(&readers[i], NULL, reader, NULL); + } + + sleep(20); + + ck_pr_store_int(&leave, 1); + for (i = 0; i < NUM_READER_THREADS; i++) + pthread_join(readers[i], NULL); + + pthread_join(writer, NULL); + fprintf(stderr, "Current Entries: %u\n", ck_bag_count(&bag)); + ck_bag_destroy(&bag); + return 0; +} + diff --git a/src/ck_bag.c b/src/ck_bag.c new file mode 100644 index 0000000..829c3b7 --- /dev/null +++ b/src/ck_bag.c @@ -0,0 +1,365 @@ +/* + * Copyright 2012 Abel P. Mathew + * Copyright 2012 Samy Al Bahra + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include + +#if CK_MD_CACHELINE > 128 +#define CK_BAG_CL_SIZE 128 +#else +#define CK_BAG_CL_SIZE CK_MD_CACHELINE +#endif + +#define CK_BAG_PAGE_SIZE CK_MD_PAGE_SIZE + +#define CK_BAG_MAX_N_ENTRIES (1 << 12) + +static struct ck_malloc allocator; +static size_t allocator_overhead; + +bool +ck_bag_init(struct ck_bag *bag, + size_t n_block_entries, + enum ck_bag_allocation_strategy as) +{ + + size_t block_overhead; + if (bag == NULL) + return false; + + bag->avail_head = bag->avail_tail = NULL; + bag->head = NULL; + bag->n_entries = 0; + bag->alloc_strat = as; + + if (n_block_entries > CK_BAG_MAX_N_ENTRIES) + return false; + + /* + * By default, a ck_bag_block occupies two cachelines. If n_entries is less + * than the # of entries that can fit within one cacheline (including + * overhead), then bag->info.max = number of entries that can fit into a + * single cacheline. + */ + block_overhead = sizeof(struct ck_bag_block) + allocator_overhead; + + if (n_block_entries == CK_BAG_DEFAULT) { + bag->info.max = ((CK_BAG_PAGE_SIZE - block_overhead) / sizeof(void *)); + } else { + bag->info.max = ((CK_BAG_CL_SIZE - block_overhead) / sizeof(void *)); + if (n_block_entries > bag->info.max) + bag->info.max = n_block_entries; + } + + bag->info.bytes = block_overhead + sizeof(void *) * bag->info.max; + + return true; +} + +void +ck_bag_destroy(struct ck_bag *bag) +{ + struct ck_bag_block *cursor; + + cursor = bag->head; + while (bag->head != NULL) { + cursor = bag->head; + bag->head = ck_bag_block_next(cursor->next); + allocator.free(cursor, bag->info.bytes, true); + } + + return; +} + +bool +ck_bag_allocator_set(struct ck_malloc *m, size_t alloc_overhead) +{ + + if (m->malloc == NULL || m->free == NULL) + return false; + + allocator = *m; + allocator_overhead = alloc_overhead; + return true; +} + +bool +ck_bag_put_spmc(struct ck_bag *bag, void *entry) +{ + + struct ck_bag_block *cursor, *new_block, *new_block_prev, *new_tail; + uint16_t n_entries_block; + size_t blocks_alloc, i; + uintptr_t next; + + new_block = new_block_prev = new_tail = NULL; + + /* + * Blocks with available entries are stored in + * the bag's available list. + */ + cursor = bag->avail_head; + if (cursor != NULL) { + n_entries_block = ck_bag_block_count(cursor); + } else { + /* The bag is full, allocate a new set of blocks */ + if (bag->alloc_strat == CK_BAG_ALLOCATE_GEOMETRIC) + blocks_alloc = (bag->n_blocks + 1) << 1; + else + blocks_alloc = 1; + + for (i = 0; i < blocks_alloc-1; i++) { + new_block = allocator.malloc(bag->info.bytes); + + if (new_block == NULL) + return false; + + /* + * First node is the tail of the Bag. + * Second node is the new tail of the Available list. + */ + if (i == 0) + new_tail = new_block; + + new_block->next = new_block_prev; + new_block->avail_next = new_block_prev; + if (new_block_prev != NULL) + new_block_prev->avail_prev = new_block; + + new_block_prev = new_block; + } + + /* + * Insert entry into last allocated block. + * cursor is new head of available list. + */ + cursor = allocator.malloc(bag->info.bytes); + cursor->avail_next = new_block; + cursor->avail_prev = NULL; + new_block->avail_prev = cursor; + n_entries_block = 0; + bag->n_blocks += blocks_alloc; + } + + /* Update the Available List */ + if (new_block != NULL) { + if (bag->avail_tail != NULL) { + cursor->avail_prev = bag->avail_tail; + bag->avail_tail->avail_next = cursor; + } else { + /* Available list was previously empty */ + bag->avail_head = cursor; + } + + bag->avail_tail = new_tail; + } else { + /* New entry will fill up block, remove from avail list */ + if (n_entries_block == bag->info.max-1) { + if (cursor->avail_prev != NULL) + cursor->avail_prev->avail_next = cursor->avail_next; + + if (cursor->avail_next != NULL) + cursor->avail_next->avail_prev = cursor->avail_prev; + + if (bag->avail_head == cursor) + bag->avail_head = cursor->avail_next; + + if (bag->avail_tail == cursor) + bag->avail_tail = cursor->avail_prev; + + /* For debugging purposes */ + cursor->avail_next = NULL; + cursor->avail_prev = NULL; + } + } + + /* Update array and block->n_entries */ + cursor->array[n_entries_block++] = entry; + next = ((uintptr_t)n_entries_block << 48); + ck_pr_fence_store(); + + /* Update bag's list */ + if (bag->head == NULL || n_entries_block == 1) { + next += ((uintptr_t)(void *)ck_bag_block_next(bag->head)); + ck_pr_store_ptr(&cursor->next, (void *)next); + ck_pr_store_ptr(&bag->head, cursor); + } else { + next += ((uintptr_t)(void *)ck_bag_block_next(cursor->next)); + ck_pr_store_ptr(&cursor->next, (void *)next); + } + ck_pr_store_uint(&bag->n_entries, bag->n_entries + 1); + + return true; +} + +/* + * Set + * Replace prev_entry with new entry if exists, otherwise insert into bag + * + * On return, new_entry = prev_entry on replacement, NULL on insertion. + */ +bool +ck_bag_set_spmc(struct ck_bag *bag, void *compare, void *update) +{ + + struct ck_bag_block *cursor; + uint16_t block_index; + uint16_t n_entries_block = 0; + + cursor = bag->head; + while (cursor != NULL) { + n_entries_block = ck_bag_block_count(cursor); + for (block_index = 0; block_index < n_entries_block; block_index++) { + if (cursor->array[block_index] != compare) + continue; + + ck_pr_store_ptr(&cursor->array[block_index], update); + return true; + } + + cursor = ck_bag_block_next(cursor->next); + } + + return ck_bag_put_spmc(bag, update); +} + +bool +ck_bag_remove_spmc(struct ck_bag *bag, void *entry) +{ + struct ck_bag_block *cursor, *copy, *prev; + uint16_t block_index, n_entries; + + if (bag == NULL) + return -1; + + cursor = bag->head; + prev = NULL; + while (cursor != NULL) { + n_entries = ck_bag_block_count(cursor); + for (block_index = 0; block_index < n_entries; block_index++) { + if (cursor->array[block_index] == entry) + goto found; + + } + prev = cursor; + cursor = ck_bag_block_next(cursor->next); + } + + return true; + +found: + /* Cursor points to containing block, block_index is index of deletion */ + if (n_entries == 1) { + /* If a block's single entry is being removed, remove the block. */ + if (prev == NULL) { + struct ck_bag_block *new_head = ck_bag_block_next(cursor->next); + ck_pr_store_ptr(&bag->head, new_head); + } else { + uintptr_t next = ((uintptr_t)prev->next & (CK_BAG_BLOCK_ENTRIES_MASK)) | + (uintptr_t)(void *)ck_bag_block_next(cursor->next); + ck_pr_store_ptr(&prev->next, (struct ck_bag_block *)next); + } + + /* Remove block from available list */ + if (cursor->avail_prev != NULL) + cursor->avail_prev->avail_next = cursor->avail_next; + + if (cursor->avail_next != NULL) + cursor->avail_next->avail_prev = cursor->avail_prev; + + copy = cursor->avail_next; + } else { + uintptr_t next_ptr; + copy = allocator.malloc(bag->info.bytes); + + if (copy == NULL) + return false; + + memcpy(copy, cursor, bag->info.bytes); + copy->array[block_index] = copy->array[--n_entries]; + + ck_pr_fence_store(); + + next_ptr = (uintptr_t)(void *)ck_bag_block_next(copy->next); + copy->next = (void *)(((uintptr_t)n_entries << 48) | next_ptr); + + if (prev == NULL) { + ck_pr_store_ptr(&bag->head, copy); + } else { + uintptr_t next = ((uintptr_t)prev->next & (CK_BAG_BLOCK_ENTRIES_MASK)) | + (uintptr_t)(void *)ck_bag_block_next(copy); + ck_pr_store_ptr(&prev->next, (struct ck_bag_block *)next); + } + + if (n_entries == bag->info.max - 1) { + /* Block was previously fully, add to head of avail. list */ + copy->avail_next = bag->avail_head; + copy->avail_prev = NULL; + bag->avail_head = copy; + } + + } + + /* Update available list. */ + if (bag->avail_head == cursor) + bag->avail_head = copy; + + if (bag->avail_tail == cursor) + bag->avail_tail = copy; + + allocator.free(cursor, sizeof(struct ck_bag_block), true); + ck_pr_store_uint(&bag->n_entries, bag->n_entries - 1); + return true; +} + +bool +ck_bag_member_spmc(struct ck_bag *bag, void *entry) +{ + struct ck_bag_block *cursor; + uint16_t block_index, n_entries; + + if (bag->head == NULL) + return NULL; + + cursor = ck_pr_load_ptr(&bag->head); + while (cursor != NULL) { + n_entries = ck_bag_block_count(cursor); + for (block_index = 0; block_index < n_entries; block_index++) { + if (ck_pr_load_ptr(&cursor->array[block_index]) == entry) + return true; + } + cursor = ck_bag_block_next(ck_pr_load_ptr(&cursor->next)); + } + + return false; +} +