From 431c24a90ad7e2782e02e27e2142a53de8d9455b Mon Sep 17 00:00:00 2001 From: Abel Mathew Date: Tue, 10 Apr 2012 16:03:38 +0000 Subject: [PATCH] ck_bag: Lock-Free SPMC bag/collection for x86_64. A bag is a linked list of blocks, with each block containing an array. Insertions are on the order of O(1) and deletions are on the order of O(N). This data structure is meant to act as a lock-free vector implementation. Signed-off-by: Samy Al Bahra --- include/ck_bag.h | 171 +++++++++++++ include/ck_md.h | 4 + regressions/ck_bag/validate/Makefile | 14 + regressions/ck_bag/validate/order.c | 218 ++++++++++++++++ src/ck_bag.c | 365 +++++++++++++++++++++++++++ 5 files changed, 772 insertions(+) create mode 100644 include/ck_bag.h create mode 100644 regressions/ck_bag/validate/Makefile create mode 100644 regressions/ck_bag/validate/order.c create mode 100644 src/ck_bag.c diff --git a/include/ck_bag.h b/include/ck_bag.h new file mode 100644 index 0000000..bec60c4 --- /dev/null +++ b/include/ck_bag.h @@ -0,0 +1,171 @@ +/* + * Copyright 2012 Abel P. Mathew + * Copyright 2012 Samy Al Bahra + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#ifndef _CK_BAG_H +#define _CK_BAG_H + +#include +#include +#include +#include +#include + +/* + * ck_bag is a lock-free spmc linked list of blocks. + * + * A block consists of: + * next: + * Linkage for bag linked list. + * avail_next, avail_prev: + * Linkage for bag's available linked list (to support 0(1) inserts). + * array: + * flexible array member. + * + * The top 3 bytes of "next" contain the # of entries within block->array. + * + * Valid entries in block->array are contigious and stored at the front + * of the array. Empty entries are stored at the back. + */ + + +/* + * Memory Allocation Strategies: + * GEOMETRIC = allocate # of existing blocks * 2. + * LINEAR = only allocate a single block. + */ +enum ck_bag_allocation_strategy { + CK_BAG_ALLOCATE_GEOMETRIC = 0, + CK_BAG_ALLOCATE_LINEAR +}; + +/* + * max: max n_entries per block + * bytes: sizeof(ck_bag_block) + sizeof(flex. array member) + * + inline allocator overhead + */ +struct ck_bag_block_info { + size_t max; + size_t bytes; +}; + +#define CK_BAG_DEFAULT 0 + +struct ck_bag_block { + struct ck_bag_block *next; + struct ck_bag_block *avail_next; + struct ck_bag_block *avail_prev; + + void *array[]; +} CK_CC_CACHELINE; + +struct ck_bag { + struct ck_bag_block *head; + struct ck_bag_block *avail_head; + struct ck_bag_block *avail_tail; + + unsigned int n_entries; + unsigned int n_blocks; + + enum ck_bag_allocation_strategy alloc_strat; + + struct ck_bag_block_info info; +}; +typedef struct ck_bag ck_bag_t; + +struct ck_bag_iterator { + struct ck_bag_block *block; + uint16_t index; +}; +typedef struct ck_bag_iterator ck_bag_iterator_t; + +#define CK_BAG_BLOCK_ENTRIES_MASK ((uintptr_t)0xFFFF << 48) + +CK_CC_INLINE static struct ck_bag_block * +ck_bag_block_next(struct ck_bag_block *block) +{ + + return (struct ck_bag_block *)((uintptr_t)block & ~CK_BAG_BLOCK_ENTRIES_MASK); +} + +CK_CC_INLINE static unsigned int +ck_bag_count(struct ck_bag *bag) +{ + + return ck_pr_load_uint(&bag->n_entries); +} + +CK_CC_INLINE static uint16_t +ck_bag_block_count(struct ck_bag_block *block) +{ + + return (uintptr_t)ck_pr_load_ptr(&block->next) >> 48; +} + +CK_CC_INLINE static void +ck_bag_iterator_init(ck_bag_iterator_t *iterator, ck_bag_t *bag) +{ + + iterator->block = ck_pr_load_ptr(&bag->head); + iterator->index = 0; + return; +} + +CK_CC_INLINE static bool +ck_bag_next(struct ck_bag_iterator *iterator, void **entry) +{ + uint16_t n_entries; + struct ck_bag_block *next; + + if (iterator->block == NULL) + return NULL; + + n_entries = ck_bag_block_count(iterator->block); + if (iterator->index >= n_entries) { + next = ck_pr_load_ptr(&iterator->block->next); + iterator->block = ck_bag_block_next(next); + if (iterator->block == NULL || ck_bag_block_count(iterator->block) == 0) + return false; + + iterator->index = 0; + } + + if (iterator->block == NULL) + return false; + + *entry = ck_pr_load_ptr(&iterator->block->array[iterator->index++]); + return true; +} + +bool ck_bag_init(struct ck_bag *, size_t, enum ck_bag_allocation_strategy); +bool ck_bag_allocator_set(struct ck_malloc *, size_t); +void ck_bag_destroy(ck_bag_t *); +bool ck_bag_put_spmc(ck_bag_t *, void *); +bool ck_bag_set_spmc(struct ck_bag *, void *, void *); +bool ck_bag_remove_spmc(ck_bag_t *, void *); +bool ck_bag_member_spmc(ck_bag_t *, void *); + +#endif /* CK_BAG_H */ diff --git a/include/ck_md.h b/include/ck_md.h index 173e096..33c5a69 100644 --- a/include/ck_md.h +++ b/include/ck_md.h @@ -31,4 +31,8 @@ #define CK_MD_CACHELINE (64) #endif +#ifndef CK_MD_PAGE_SIZE +#define CK_MD_PAGE_SIZE (4096) +#endif + #endif /* _CK_MD_H */ diff --git a/regressions/ck_bag/validate/Makefile b/regressions/ck_bag/validate/Makefile new file mode 100644 index 0000000..563566f --- /dev/null +++ b/regressions/ck_bag/validate/Makefile @@ -0,0 +1,14 @@ +.PHONY: clean distribution + +OBJECTS=order + +all: $(OBJECTS) + +order: order.c ../../../include/ck_bag.h ../../../src/ck_bag.c + $(CC) $(CFLAGS) -o order order.c ../../../src/ck_bag.c + +clean: + rm -rf *.dSYM *~ *.o $(OBJECTS) + +include ../../../build/regressions.build +CFLAGS+=$(PTHREAD_CFLAGS) -D_GNU_SOURCE diff --git a/regressions/ck_bag/validate/order.c b/regressions/ck_bag/validate/order.c new file mode 100644 index 0000000..9625cee --- /dev/null +++ b/regressions/ck_bag/validate/order.c @@ -0,0 +1,218 @@ +/* + * Copyright 2012 Abel P. Mathew + * Copyright 2012 Samy Al Bahra + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include + +#include "../../common.h" + +#define NUM_READER_THREADS 8 +#define WRITER_MAX (1 << 17) +#define READ_LATENCY 8 + +static ck_bag_t bag; +static int leave; +static unsigned int barrier; + +static void * +bag_malloc(size_t r) +{ + + return malloc(r); +} + +static void +bag_free(void *p, size_t b, bool r) +{ + + (void)p; + (void)b; + (void)r; +// free(p); + return; +} + +static struct ck_malloc allocator = { + .malloc = bag_malloc, + .free = bag_free +}; + +static void * +reader(void *arg) +{ + void *curr_ptr; + intptr_t curr, prev, curr_max, prev_max; + unsigned long long n_entries = 0, iterations = 0; + + (void)arg; + + ck_bag_iterator_t iterator; + struct ck_bag_block *block = NULL; + + /* + * Check if entries within a block are sequential. Since ck_bag inserts + * newly occupied blocks at the beginning of the list, there is no ordering + * guarantee across the bag. + */ + for (;;) { + ck_bag_iterator_init(&iterator, &bag); + curr_max = prev_max = prev = -1; + + while (ck_bag_next(&iterator, &curr_ptr)) { + + if (block != iterator.block) { + prev = -1; + curr = 0; + prev_max = curr_max; + curr_max = 0; + block = iterator.block; + } + + curr = (uintptr_t)(curr_ptr); + if (curr < prev) { + /* Ascending order within block violated */ + fprintf(stderr, "ERROR: %ju < %ju \n", + (uintmax_t)curr, (uintmax_t)prev); + exit(EXIT_FAILURE); + } else if (prev_max != -1 && curr > prev_max) { + /* Max of prev block > max of current block */ + fprintf(stderr, "ERROR: %ju > prev_max: %ju\n", + (uintmax_t)curr, (uintmax_t)prev_max); + exit(EXIT_FAILURE); + } + + curr_max = curr; + + prev = curr; + n_entries++; + } + + iterations++; + if (ck_pr_load_int(&leave) == 1) + break; + } + + fprintf(stderr, "Read %llu entries in %llu iterations.\n", n_entries, iterations); + + ck_pr_inc_uint(&barrier); + while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS) + ck_pr_stall(); + + return NULL; + +} + +static void * +writer_thread(void *unused) +{ + unsigned int i; + + (void)unused; + + for (;;) { + for (i = 0; i < WRITER_MAX; i++) + ck_bag_put_spmc(&bag, (void *)(uintptr_t)i); + + if (ck_pr_load_int(&leave) == 1) + break; + + for (i = 0; i < WRITER_MAX >> 1; i++) { + void *replace = (void *)(uintptr_t)i; + if (ck_bag_set_spmc(&bag, (void *)(uintptr_t)i, replace) == false || + replace != (void *)(uintptr_t)i) { + fprintf(stderr, "ERROR: set %ju != %ju", + (uintmax_t)(uintptr_t)replace, (uintmax_t)i); + exit(EXIT_FAILURE); + } + } + + if (ck_pr_load_int(&leave) == 1) + break; + + for (i = WRITER_MAX; i > 1; i--) + ck_bag_remove_spmc(&bag, (void *)((uintptr_t)i - 1)); + } + + while (ck_pr_load_uint(&barrier) != NUM_READER_THREADS) + ck_pr_stall(); + + return NULL; +} + +int +main(int argc, char **argv) +{ + pthread_t *readers; + pthread_t writer; + unsigned int i, curr; + void *curr_ptr; + ck_bag_iterator_t bag_it; + + (void)argc; + (void)argv; + + ck_bag_allocator_set(&allocator, 0); + ck_bag_init(&bag, CK_BAG_DEFAULT, CK_BAG_ALLOCATE_GEOMETRIC); + + fprintf(stderr, "Block Size: %zuB\n", bag.info.bytes); + + /* Sequential test */ + for (i = 0; i < 10; i++) + ck_bag_put_spmc(&bag, (void *)(uintptr_t)i); + + ck_bag_iterator_init(&bag_it, &bag); + while (ck_bag_next(&bag_it, &curr_ptr)) { + curr = (uintptr_t)(curr_ptr); + if (curr > (uintptr_t)i) + fprintf(stderr, "ERROR: %ju != %u", (uintmax_t)curr, i); + + ck_bag_remove_spmc(&bag, curr_ptr); + } + + /* Concurrent test */ + pthread_create(&writer, NULL, writer_thread, NULL); + readers = malloc(sizeof(pthread_t) * NUM_READER_THREADS); + for (i = 0; i < NUM_READER_THREADS; i++) { + pthread_create(&readers[i], NULL, reader, NULL); + } + + sleep(20); + + ck_pr_store_int(&leave, 1); + for (i = 0; i < NUM_READER_THREADS; i++) + pthread_join(readers[i], NULL); + + pthread_join(writer, NULL); + fprintf(stderr, "Current Entries: %u\n", ck_bag_count(&bag)); + ck_bag_destroy(&bag); + return 0; +} + diff --git a/src/ck_bag.c b/src/ck_bag.c new file mode 100644 index 0000000..829c3b7 --- /dev/null +++ b/src/ck_bag.c @@ -0,0 +1,365 @@ +/* + * Copyright 2012 Abel P. Mathew + * Copyright 2012 Samy Al Bahra + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include + +#if CK_MD_CACHELINE > 128 +#define CK_BAG_CL_SIZE 128 +#else +#define CK_BAG_CL_SIZE CK_MD_CACHELINE +#endif + +#define CK_BAG_PAGE_SIZE CK_MD_PAGE_SIZE + +#define CK_BAG_MAX_N_ENTRIES (1 << 12) + +static struct ck_malloc allocator; +static size_t allocator_overhead; + +bool +ck_bag_init(struct ck_bag *bag, + size_t n_block_entries, + enum ck_bag_allocation_strategy as) +{ + + size_t block_overhead; + if (bag == NULL) + return false; + + bag->avail_head = bag->avail_tail = NULL; + bag->head = NULL; + bag->n_entries = 0; + bag->alloc_strat = as; + + if (n_block_entries > CK_BAG_MAX_N_ENTRIES) + return false; + + /* + * By default, a ck_bag_block occupies two cachelines. If n_entries is less + * than the # of entries that can fit within one cacheline (including + * overhead), then bag->info.max = number of entries that can fit into a + * single cacheline. + */ + block_overhead = sizeof(struct ck_bag_block) + allocator_overhead; + + if (n_block_entries == CK_BAG_DEFAULT) { + bag->info.max = ((CK_BAG_PAGE_SIZE - block_overhead) / sizeof(void *)); + } else { + bag->info.max = ((CK_BAG_CL_SIZE - block_overhead) / sizeof(void *)); + if (n_block_entries > bag->info.max) + bag->info.max = n_block_entries; + } + + bag->info.bytes = block_overhead + sizeof(void *) * bag->info.max; + + return true; +} + +void +ck_bag_destroy(struct ck_bag *bag) +{ + struct ck_bag_block *cursor; + + cursor = bag->head; + while (bag->head != NULL) { + cursor = bag->head; + bag->head = ck_bag_block_next(cursor->next); + allocator.free(cursor, bag->info.bytes, true); + } + + return; +} + +bool +ck_bag_allocator_set(struct ck_malloc *m, size_t alloc_overhead) +{ + + if (m->malloc == NULL || m->free == NULL) + return false; + + allocator = *m; + allocator_overhead = alloc_overhead; + return true; +} + +bool +ck_bag_put_spmc(struct ck_bag *bag, void *entry) +{ + + struct ck_bag_block *cursor, *new_block, *new_block_prev, *new_tail; + uint16_t n_entries_block; + size_t blocks_alloc, i; + uintptr_t next; + + new_block = new_block_prev = new_tail = NULL; + + /* + * Blocks with available entries are stored in + * the bag's available list. + */ + cursor = bag->avail_head; + if (cursor != NULL) { + n_entries_block = ck_bag_block_count(cursor); + } else { + /* The bag is full, allocate a new set of blocks */ + if (bag->alloc_strat == CK_BAG_ALLOCATE_GEOMETRIC) + blocks_alloc = (bag->n_blocks + 1) << 1; + else + blocks_alloc = 1; + + for (i = 0; i < blocks_alloc-1; i++) { + new_block = allocator.malloc(bag->info.bytes); + + if (new_block == NULL) + return false; + + /* + * First node is the tail of the Bag. + * Second node is the new tail of the Available list. + */ + if (i == 0) + new_tail = new_block; + + new_block->next = new_block_prev; + new_block->avail_next = new_block_prev; + if (new_block_prev != NULL) + new_block_prev->avail_prev = new_block; + + new_block_prev = new_block; + } + + /* + * Insert entry into last allocated block. + * cursor is new head of available list. + */ + cursor = allocator.malloc(bag->info.bytes); + cursor->avail_next = new_block; + cursor->avail_prev = NULL; + new_block->avail_prev = cursor; + n_entries_block = 0; + bag->n_blocks += blocks_alloc; + } + + /* Update the Available List */ + if (new_block != NULL) { + if (bag->avail_tail != NULL) { + cursor->avail_prev = bag->avail_tail; + bag->avail_tail->avail_next = cursor; + } else { + /* Available list was previously empty */ + bag->avail_head = cursor; + } + + bag->avail_tail = new_tail; + } else { + /* New entry will fill up block, remove from avail list */ + if (n_entries_block == bag->info.max-1) { + if (cursor->avail_prev != NULL) + cursor->avail_prev->avail_next = cursor->avail_next; + + if (cursor->avail_next != NULL) + cursor->avail_next->avail_prev = cursor->avail_prev; + + if (bag->avail_head == cursor) + bag->avail_head = cursor->avail_next; + + if (bag->avail_tail == cursor) + bag->avail_tail = cursor->avail_prev; + + /* For debugging purposes */ + cursor->avail_next = NULL; + cursor->avail_prev = NULL; + } + } + + /* Update array and block->n_entries */ + cursor->array[n_entries_block++] = entry; + next = ((uintptr_t)n_entries_block << 48); + ck_pr_fence_store(); + + /* Update bag's list */ + if (bag->head == NULL || n_entries_block == 1) { + next += ((uintptr_t)(void *)ck_bag_block_next(bag->head)); + ck_pr_store_ptr(&cursor->next, (void *)next); + ck_pr_store_ptr(&bag->head, cursor); + } else { + next += ((uintptr_t)(void *)ck_bag_block_next(cursor->next)); + ck_pr_store_ptr(&cursor->next, (void *)next); + } + ck_pr_store_uint(&bag->n_entries, bag->n_entries + 1); + + return true; +} + +/* + * Set + * Replace prev_entry with new entry if exists, otherwise insert into bag + * + * On return, new_entry = prev_entry on replacement, NULL on insertion. + */ +bool +ck_bag_set_spmc(struct ck_bag *bag, void *compare, void *update) +{ + + struct ck_bag_block *cursor; + uint16_t block_index; + uint16_t n_entries_block = 0; + + cursor = bag->head; + while (cursor != NULL) { + n_entries_block = ck_bag_block_count(cursor); + for (block_index = 0; block_index < n_entries_block; block_index++) { + if (cursor->array[block_index] != compare) + continue; + + ck_pr_store_ptr(&cursor->array[block_index], update); + return true; + } + + cursor = ck_bag_block_next(cursor->next); + } + + return ck_bag_put_spmc(bag, update); +} + +bool +ck_bag_remove_spmc(struct ck_bag *bag, void *entry) +{ + struct ck_bag_block *cursor, *copy, *prev; + uint16_t block_index, n_entries; + + if (bag == NULL) + return -1; + + cursor = bag->head; + prev = NULL; + while (cursor != NULL) { + n_entries = ck_bag_block_count(cursor); + for (block_index = 0; block_index < n_entries; block_index++) { + if (cursor->array[block_index] == entry) + goto found; + + } + prev = cursor; + cursor = ck_bag_block_next(cursor->next); + } + + return true; + +found: + /* Cursor points to containing block, block_index is index of deletion */ + if (n_entries == 1) { + /* If a block's single entry is being removed, remove the block. */ + if (prev == NULL) { + struct ck_bag_block *new_head = ck_bag_block_next(cursor->next); + ck_pr_store_ptr(&bag->head, new_head); + } else { + uintptr_t next = ((uintptr_t)prev->next & (CK_BAG_BLOCK_ENTRIES_MASK)) | + (uintptr_t)(void *)ck_bag_block_next(cursor->next); + ck_pr_store_ptr(&prev->next, (struct ck_bag_block *)next); + } + + /* Remove block from available list */ + if (cursor->avail_prev != NULL) + cursor->avail_prev->avail_next = cursor->avail_next; + + if (cursor->avail_next != NULL) + cursor->avail_next->avail_prev = cursor->avail_prev; + + copy = cursor->avail_next; + } else { + uintptr_t next_ptr; + copy = allocator.malloc(bag->info.bytes); + + if (copy == NULL) + return false; + + memcpy(copy, cursor, bag->info.bytes); + copy->array[block_index] = copy->array[--n_entries]; + + ck_pr_fence_store(); + + next_ptr = (uintptr_t)(void *)ck_bag_block_next(copy->next); + copy->next = (void *)(((uintptr_t)n_entries << 48) | next_ptr); + + if (prev == NULL) { + ck_pr_store_ptr(&bag->head, copy); + } else { + uintptr_t next = ((uintptr_t)prev->next & (CK_BAG_BLOCK_ENTRIES_MASK)) | + (uintptr_t)(void *)ck_bag_block_next(copy); + ck_pr_store_ptr(&prev->next, (struct ck_bag_block *)next); + } + + if (n_entries == bag->info.max - 1) { + /* Block was previously fully, add to head of avail. list */ + copy->avail_next = bag->avail_head; + copy->avail_prev = NULL; + bag->avail_head = copy; + } + + } + + /* Update available list. */ + if (bag->avail_head == cursor) + bag->avail_head = copy; + + if (bag->avail_tail == cursor) + bag->avail_tail = copy; + + allocator.free(cursor, sizeof(struct ck_bag_block), true); + ck_pr_store_uint(&bag->n_entries, bag->n_entries - 1); + return true; +} + +bool +ck_bag_member_spmc(struct ck_bag *bag, void *entry) +{ + struct ck_bag_block *cursor; + uint16_t block_index, n_entries; + + if (bag->head == NULL) + return NULL; + + cursor = ck_pr_load_ptr(&bag->head); + while (cursor != NULL) { + n_entries = ck_bag_block_count(cursor); + for (block_index = 0; block_index < n_entries; block_index++) { + if (ck_pr_load_ptr(&cursor->array[block_index]) == entry) + return true; + } + cursor = ck_bag_block_next(ck_pr_load_ptr(&cursor->next)); + } + + return false; +} +