diff --git a/include/ck_epoch.h b/include/ck_epoch.h index f3f911e..33dcc1c 100644 --- a/include/ck_epoch.h +++ b/include/ck_epoch.h @@ -38,19 +38,20 @@ #include #include -/* - * CK_EPOCH_LENGTH must be a power of 2. - */ #ifndef CK_EPOCH_LENGTH #define CK_EPOCH_LENGTH 4 #endif struct ck_epoch_entry; typedef struct ck_epoch_entry ck_epoch_entry_t; -typedef void (*ck_epoch_destructor_t)(ck_epoch_entry_t *); +typedef void ck_epoch_cb_t(ck_epoch_entry_t *); +/* + * This should be embedded into objects you wish to be the target of + * ck_epoch_cb_t functions (with ck_epoch_call). + */ struct ck_epoch_entry { - ck_epoch_destructor_t destroy; + ck_epoch_cb_t *function; ck_stack_entry_t stack_entry; }; @@ -59,17 +60,14 @@ struct ck_epoch_entry { */ #define CK_EPOCH_CONTAINER(T, M, N) CK_CC_CONTAINER(struct ck_epoch_entry, T, M, N) -struct ck_epoch; struct ck_epoch_record { - unsigned int active; + unsigned int state; unsigned int epoch; - ck_stack_t pending[CK_EPOCH_LENGTH]; + unsigned int active; unsigned int n_pending; - unsigned int status; - unsigned int delta; unsigned int n_peak; - uint64_t n_reclamations; - struct ck_epoch *global; + unsigned long n_dispatch; + ck_stack_t pending[CK_EPOCH_LENGTH]; ck_stack_entry_t record_next; } CK_CC_CACHELINE; typedef struct ck_epoch_record ck_epoch_record_t; @@ -78,13 +76,12 @@ struct ck_epoch { unsigned int epoch; char pad[CK_MD_CACHELINE - sizeof(unsigned int)]; ck_stack_t records; - unsigned int threshold; unsigned int n_free; }; typedef struct ck_epoch ck_epoch_t; CK_CC_INLINE static void -ck_epoch_read_begin(ck_epoch_record_t *record) +ck_epoch_begin(ck_epoch_t *epoch, ck_epoch_record_t *record) { /* @@ -92,8 +89,7 @@ ck_epoch_read_begin(ck_epoch_record_t *record) * section. */ if (record->active == 0) { - unsigned int g_epoch = ck_pr_load_uint(&record->global->epoch); - g_epoch &= CK_EPOCH_LENGTH - 1; + unsigned int g_epoch = ck_pr_load_uint(&epoch->epoch); ck_pr_store_uint(&record->epoch, g_epoch); } @@ -103,45 +99,41 @@ ck_epoch_read_begin(ck_epoch_record_t *record) } CK_CC_INLINE static void -ck_epoch_read_end(ck_epoch_record_t *record) +ck_epoch_end(ck_epoch_t *global, ck_epoch_record_t *record) { - ck_pr_fence_load(); - ck_pr_store_uint(&record->active, record->active - 1); - return; -} + (void)global; -CK_CC_INLINE static void -ck_epoch_write_end(ck_epoch_record_t *record) -{ - - ck_pr_fence_store(); + ck_pr_fence_memory(); ck_pr_store_uint(&record->active, record->active - 1); return; } +/* + * Defers the execution of the function pointed to by the "cb" + * argument until an epoch counter loop. This allows for a + * non-blocking deferral. + */ CK_CC_INLINE static void -ck_epoch_retire(ck_epoch_record_t *record, ck_epoch_entry_t *entry, ck_epoch_destructor_t destroy) +ck_epoch_call(ck_epoch_record_t *record, + ck_epoch_entry_t *entry, + ck_epoch_cb_t *function) { + unsigned int offset = record->epoch & (CK_EPOCH_LENGTH - 1); - entry->destroy = destroy; - ck_stack_push_spnc(&record->pending[record->epoch], &entry->stack_entry); - record->n_pending += 1; - - if (record->n_pending > record->n_peak) - record->n_peak = record->n_pending; - + record->n_pending++; + entry->function = function; + ck_stack_push_spnc(&record->pending[offset], &entry->stack_entry); return; } -void ck_epoch_init(ck_epoch_t *, unsigned int); +void ck_epoch_init(ck_epoch_t *); ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *); void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *); void ck_epoch_unregister(ck_epoch_record_t *); -void ck_epoch_tick(ck_epoch_t *, ck_epoch_record_t *); -bool ck_epoch_reclaim(ck_epoch_record_t *); -void ck_epoch_write_begin(ck_epoch_record_t *); -void ck_epoch_free(ck_epoch_record_t *, ck_epoch_entry_t *, ck_epoch_destructor_t); -void ck_epoch_purge(ck_epoch_record_t *record); +bool ck_epoch_poll(ck_epoch_t *, ck_epoch_record_t *); +void ck_epoch_call(ck_epoch_record_t *, ck_epoch_entry_t *, ck_epoch_cb_t *); +void ck_epoch_synchronize(ck_epoch_t *, ck_epoch_record_t *); +void ck_epoch_barrier(ck_epoch_t *, ck_epoch_record_t *); #endif /* _CK_EPOCH_H */ diff --git a/regressions/ck_bag/validate/order.c b/regressions/ck_bag/validate/order.c index 30802a5..0b82828 100644 --- a/regressions/ck_bag/validate/order.c +++ b/regressions/ck_bag/validate/order.c @@ -74,7 +74,7 @@ bag_free(void *p, size_t b, bool r) (void)b; if (r == true) { - ck_epoch_free(&epoch_wr, &(--e)->epoch_entry, bag_destroy); + ck_epoch_call(&epoch_wr, &(--e)->epoch_entry, bag_destroy); } else { free(--e); } @@ -107,7 +107,7 @@ reader(void *arg) * guarantee across the bag. */ for (;;) { - ck_epoch_read_begin(&epoch_record); + ck_epoch_begin(&epoch_bag, &epoch_record); ck_bag_iterator_init(&iterator, &bag); curr_max = prev_max = prev = -1; block = NULL; @@ -141,8 +141,7 @@ reader(void *arg) prev = curr; n_entries++; } - - ck_epoch_read_end(&epoch_record); + ck_epoch_end(&epoch_bag, &epoch_record); iterations++; if (ck_pr_load_int(&leave) == 1) @@ -169,7 +168,6 @@ writer_thread(void *unused) for (;;) { iteration++; - ck_epoch_write_begin(&epoch_wr); for (i = 1; i <= writer_max; i++) { if (ck_bag_put_spmc(&bag, (void *)(uintptr_t)i) == false) { perror("ck_bag_put_spmc"); @@ -206,7 +204,7 @@ writer_thread(void *unused) } } - ck_epoch_write_end(&epoch_wr); + ck_epoch_poll(&epoch_bag, &epoch_wr); } fprintf(stderr, "Writer %u iterations, %u writes per iteration.\n", iteration, writer_max); @@ -247,7 +245,7 @@ main(int argc, char **argv) writer_max = (unsigned int)r; } - ck_epoch_init(&epoch_bag, 100); + ck_epoch_init(&epoch_bag); ck_epoch_register(&epoch_bag, &epoch_wr); ck_bag_allocator_set(&allocator, sizeof(struct bag_epoch)); if (ck_bag_init(&bag, b, CK_BAG_ALLOCATE_GEOMETRIC) == false) { diff --git a/regressions/ck_epoch/validate/Makefile b/regressions/ck_epoch/validate/Makefile index 1d9aa82..b7e8567 100644 --- a/regressions/ck_epoch/validate/Makefile +++ b/regressions/ck_epoch/validate/Makefile @@ -1,15 +1,19 @@ .PHONY: check clean distribution -OBJECTS=ck_stack ck_stack_read +OBJECTS=ck_stack ck_epoch_synchronize ck_epoch_poll all: $(OBJECTS) check: all - ./ck_stack $(CORES) 10 1 - ./ck_stack_read $(CORES) 10 1 + ./ck_stack $(CORES) 1 + ./ck_epoch_synchronize $(CORES) 1 + ./ck_epoch_poll $(CORES) 1 -ck_stack_read: ck_stack_read.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c - $(CC) $(CFLAGS) -o ck_stack_read ck_stack_read.c ../../../src/ck_epoch.c +ck_epoch_synchronize: ck_epoch_synchronize.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c + $(CC) $(CFLAGS) -o ck_epoch_synchronize ck_epoch_synchronize.c ../../../src/ck_epoch.c + +ck_epoch_poll: ck_epoch_poll.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c + $(CC) $(CFLAGS) -o ck_epoch_poll ck_epoch_poll.c ../../../src/ck_epoch.c ck_stack: ck_stack.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c $(CC) $(CFLAGS) -o ck_stack ck_stack.c ../../../src/ck_epoch.c diff --git a/regressions/ck_epoch/validate/ck_stack_read.c b/regressions/ck_epoch/validate/ck_epoch_poll.c similarity index 85% rename from regressions/ck_epoch/validate/ck_stack_read.c rename to regressions/ck_epoch/validate/ck_epoch_poll.c index 20a1055..5a87293 100644 --- a/regressions/ck_epoch/validate/ck_stack_read.c +++ b/regressions/ck_epoch/validate/ck_epoch_poll.c @@ -44,14 +44,13 @@ #include "../../common.h" -static unsigned int threshold; static unsigned int n_threads; static unsigned int barrier; static unsigned int e_barrier; static unsigned int readers; #ifndef PAIRS -#define PAIRS 5000000 +#define PAIRS 1000000 #endif #ifndef ITERATE @@ -68,6 +67,7 @@ static ck_epoch_t stack_epoch; CK_STACK_CONTAINER(struct node, stack_entry, stack_container) CK_EPOCH_CONTAINER(struct node, epoch_entry, epoch_container) static struct affinity a; +static const char animate[] = "-/|\\"; static void destructor(ck_epoch_entry_t *p) @@ -111,13 +111,15 @@ read_thread(void *unused CK_CC_UNUSED) j = 0; for (;;) { - ck_epoch_read_begin(&record); + ck_epoch_begin(&stack_epoch, &record); CK_STACK_FOREACH(&stack, cursor) { - n = cursor; + if (cursor == NULL) + continue; + + n = CK_STACK_NEXT(cursor); j++; - n++; } - ck_epoch_read_end(&record); + ck_epoch_end(&stack_epoch, &record); if (j != 0 && ck_pr_load_uint(&readers) == 0) ck_pr_store_uint(&readers, 1); @@ -168,32 +170,36 @@ thread(void *unused CK_CC_UNUSED) } for (i = 0; i < PAIRS; i++) { - ck_epoch_write_begin(&record); ck_stack_push_upmc(&stack, &entry[i]->stack_entry); - ck_epoch_write_end(&record); } while (ck_pr_load_uint(&readers) == 0) ck_pr_stall(); + fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] %2.2f: %c", (double)j / ITERATE, animate[i % strlen(animate)]); + for (i = 0; i < PAIRS; i++) { - ck_epoch_write_begin(&record); s = ck_stack_pop_upmc(&stack); - ck_epoch_write_end(&record); - e = stack_container(s); - ck_epoch_free(&record, &e->epoch_entry, destructor); + + ck_epoch_call(&record, &e->epoch_entry, destructor); + + if (i % 1024) + ck_epoch_poll(&stack_epoch, &record); + + if (i % 8192) + fprintf(stderr, "\b%c", animate[i % strlen(animate)]); } } - ck_pr_inc_uint(&e_barrier); - while (ck_pr_load_uint(&e_barrier) < n_threads); - - fprintf(stderr, "[W] Peak: %u (%2.2f%%)\n Reclamations: %" PRIu64 "\n\n", + ck_epoch_synchronize(&stack_epoch, &record); + fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n", record.n_peak, (double)record.n_peak / ((double)PAIRS * ITERATE) * 100, - record.n_reclamations); + record.n_dispatch); + ck_pr_inc_uint(&e_barrier); + while (ck_pr_load_uint(&e_barrier) < n_threads); return (NULL); } @@ -203,19 +209,18 @@ main(int argc, char *argv[]) unsigned int i; pthread_t *threads; - if (argc != 4) { - fprintf(stderr, "Usage: stack \n"); + if (argc != 3) { + fprintf(stderr, "Usage: stack \n"); exit(EXIT_FAILURE); } n_threads = atoi(argv[1]); - threshold = atoi(argv[2]); - a.delta = atoi(argv[3]); + a.delta = atoi(argv[2]); a.request = 0; threads = malloc(sizeof(pthread_t) * n_threads); - ck_epoch_init(&stack_epoch, threshold); + ck_epoch_init(&stack_epoch); for (i = 0; i < n_threads - 1; i++) pthread_create(threads + i, NULL, read_thread, NULL); diff --git a/regressions/ck_epoch/validate/ck_epoch_synchronize.c b/regressions/ck_epoch/validate/ck_epoch_synchronize.c new file mode 100644 index 0000000..fcc2a20 --- /dev/null +++ b/regressions/ck_epoch/validate/ck_epoch_synchronize.c @@ -0,0 +1,246 @@ +/* + * Copyright 2010-2012 Samy Al Bahra. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../../common.h" + +static unsigned int n_threads; +static unsigned int barrier; +static unsigned int e_barrier; +static unsigned int readers; + +#ifndef PAIRS +#define PAIRS 5000000 +#endif + +#ifndef ITERATE +#define ITERATE 20 +#endif + +#ifndef PAIRS_S +#define PAIRS_S 10000 +#endif + +#ifndef ITERATE_S +#define ITERATE_S 20 +#endif + +struct node { + unsigned int value; + ck_stack_entry_t stack_entry; + ck_epoch_entry_t epoch_entry; +}; +static ck_stack_t stack = CK_STACK_INITIALIZER; +static ck_epoch_t stack_epoch; +CK_STACK_CONTAINER(struct node, stack_entry, stack_container) +CK_EPOCH_CONTAINER(struct node, epoch_entry, epoch_container) +static struct affinity a; +static const char animate[] = "-/|\\"; + +static void +destructor(ck_epoch_entry_t *p) +{ + struct node *e = epoch_container(p); + + free(e); + return; +} + +static void * +read_thread(void *unused CK_CC_UNUSED) +{ + unsigned int j; + ck_epoch_record_t record CK_CC_CACHELINE; + ck_stack_entry_t *cursor; + + /* + * This is redundant post-incremented in order to silence some + * irrelevant GCC warnings. It is volatile in order to prevent + * elimination. + */ + volatile ck_stack_entry_t *n; + + ck_epoch_register(&stack_epoch, &record); + + if (aff_iterate(&a)) { + perror("ERROR: failed to affine thread"); + exit(EXIT_FAILURE); + } + + ck_pr_inc_uint(&barrier); + while (ck_pr_load_uint(&barrier) < n_threads); + + while (CK_STACK_ISEMPTY(&stack) == true) { + if (ck_pr_load_uint(&readers) != 0) + break; + + ck_pr_stall(); + } + + j = 0; + for (;;) { + ck_epoch_begin(&stack_epoch, &record); + CK_STACK_FOREACH(&stack, cursor) { + if (cursor == NULL) + continue; + + n = CK_STACK_NEXT(cursor); + j++; + } + ck_epoch_end(&stack_epoch, &record); + + if (j != 0 && ck_pr_load_uint(&readers) == 0) + ck_pr_store_uint(&readers, 1); + + if (CK_STACK_ISEMPTY(&stack) == true && + ck_pr_load_uint(&e_barrier) != 0) + break; + } + + ck_pr_inc_uint(&e_barrier); + while (ck_pr_load_uint(&e_barrier) < n_threads); + + fprintf(stderr, "[R] Observed entries: %u\n", j); + return (NULL); +} + +static void * +thread(void *unused CK_CC_UNUSED) +{ + struct node **entry, *e; + unsigned int i, j; + ck_epoch_record_t record; + ck_stack_entry_t *s; + + ck_epoch_register(&stack_epoch, &record); + + if (aff_iterate(&a)) { + perror("ERROR: failed to affine thread"); + exit(EXIT_FAILURE); + } + + ck_pr_inc_uint(&barrier); + while (ck_pr_load_uint(&barrier) < n_threads); + + entry = malloc(sizeof(struct node *) * PAIRS_S); + if (entry == NULL) { + fprintf(stderr, "Failed allocation.\n"); + exit(EXIT_FAILURE); + } + + for (j = 0; j < ITERATE_S; j++) { + for (i = 0; i < PAIRS_S; i++) { + entry[i] = malloc(sizeof(struct node)); + if (entry == NULL) { + fprintf(stderr, "Failed individual allocation\n"); + exit(EXIT_FAILURE); + } + } + + for (i = 0; i < PAIRS_S; i++) { + ck_stack_push_upmc(&stack, &entry[i]->stack_entry); + } + + while (ck_pr_load_uint(&readers) == 0) + ck_pr_stall(); + + fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] %2.2f: %c", (double)j / ITERATE_S, animate[i % strlen(animate)]); + + for (i = 0; i < PAIRS_S; i++) { + s = ck_stack_pop_upmc(&stack); + e = stack_container(s); + + ck_epoch_synchronize(&stack_epoch, &record); + + if (i & 1) { + ck_epoch_call(&record, &e->epoch_entry, destructor); + } else { + destructor(&e->epoch_entry); + } + + if (i % 8192) { + fprintf(stderr, "\b%c", animate[i % strlen(animate)]); + } + } + } + + ck_epoch_synchronize(&stack_epoch, &record); + fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n", + record.n_peak, + (double)record.n_peak / ((double)PAIRS * ITERATE) * 100, + record.n_dispatch); + + ck_pr_inc_uint(&e_barrier); + while (ck_pr_load_uint(&e_barrier) < n_threads); + return (NULL); +} + +int +main(int argc, char *argv[]) +{ + unsigned int i; + pthread_t *threads; + + if (argc != 3) { + fprintf(stderr, "Usage: stack \n"); + exit(EXIT_FAILURE); + } + + n_threads = atoi(argv[1]); + a.delta = atoi(argv[2]); + a.request = 0; + + threads = malloc(sizeof(pthread_t) * n_threads); + + ck_epoch_init(&stack_epoch); + + for (i = 0; i < n_threads - 1; i++) + pthread_create(threads + i, NULL, read_thread, NULL); + + pthread_create(threads + i, NULL, thread, NULL); + + for (i = 0; i < n_threads; i++) + pthread_join(threads[i], NULL); + + return (0); +} diff --git a/regressions/ck_epoch/validate/ck_stack.c b/regressions/ck_epoch/validate/ck_stack.c index f9ced5c..45777fb 100644 --- a/regressions/ck_epoch/validate/ck_stack.c +++ b/regressions/ck_epoch/validate/ck_stack.c @@ -44,7 +44,6 @@ #include "../../common.h" -static unsigned int threshold; static unsigned int n_threads; static unsigned int barrier; static unsigned int e_barrier; @@ -77,9 +76,10 @@ static void * thread(void *unused CK_CC_UNUSED) { struct node **entry, *e; - unsigned int i; ck_epoch_record_t record; ck_stack_entry_t *s; + unsigned long smr = 0; + unsigned int i; ck_epoch_register(&stack_epoch, &record); @@ -106,28 +106,27 @@ thread(void *unused CK_CC_UNUSED) while (ck_pr_load_uint(&barrier) < n_threads); for (i = 0; i < PAIRS; i++) { - ck_epoch_write_begin(&record); + ck_epoch_begin(&stack_epoch, &record); ck_stack_push_upmc(&stack, &entry[i]->stack_entry); - ck_epoch_write_end(&record); - - ck_epoch_write_begin(&record); s = ck_stack_pop_upmc(&stack); - ck_epoch_write_end(&record); + ck_epoch_end(&stack_epoch, &record); e = stack_container(s); - ck_epoch_free(&record, &e->epoch_entry, destructor); + ck_epoch_call(&record, &e->epoch_entry, destructor); + smr += ck_epoch_poll(&stack_epoch, &record) == false; } ck_pr_inc_uint(&e_barrier); while (ck_pr_load_uint(&e_barrier) < n_threads); - fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %" PRIu64 "\n\n", + fprintf(stderr, "Deferrals: %lu (%2.2f)\n", smr, (double)smr / PAIRS); + fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %lu\n\n", record.n_peak, (double)record.n_peak / PAIRS * 100, record.n_pending, - record.n_reclamations); + record.n_dispatch); - ck_epoch_purge(&record); + ck_epoch_synchronize(&stack_epoch, &record); ck_pr_inc_uint(&e_barrier); while (ck_pr_load_uint(&e_barrier) < (n_threads << 1)); @@ -146,19 +145,18 @@ main(int argc, char *argv[]) unsigned int i; pthread_t *threads; - if (argc != 4) { - fprintf(stderr, "Usage: stack \n"); + if (argc != 3) { + fprintf(stderr, "Usage: stack \n"); exit(EXIT_FAILURE); } n_threads = atoi(argv[1]); - threshold = atoi(argv[2]); - a.delta = atoi(argv[3]); + a.delta = atoi(argv[2]); a.request = 0; threads = malloc(sizeof(pthread_t) * n_threads); - ck_epoch_init(&stack_epoch, threshold); + ck_epoch_init(&stack_epoch); for (i = 0; i < n_threads; i++) pthread_create(threads + i, NULL, thread, NULL); diff --git a/regressions/ck_ht/benchmark/parallel_bytestring.c b/regressions/ck_ht/benchmark/parallel_bytestring.c index ef60fa5..16a82dc 100644 --- a/regressions/ck_ht/benchmark/parallel_bytestring.c +++ b/regressions/ck_ht/benchmark/parallel_bytestring.c @@ -106,7 +106,7 @@ ht_free(void *p, size_t b, bool r) if (r == true) { /* Destruction requires safe memory reclamation. */ - ck_epoch_free(&epoch_wr, &(--e)->epoch_entry, ht_destroy); + ck_epoch_call(&epoch_wr, &(--e)->epoch_entry, ht_destroy); } else { free(--e); } @@ -123,7 +123,7 @@ static void table_init(void) { - ck_epoch_init(&epoch_ht, 10); + ck_epoch_init(&epoch_ht); ck_epoch_register(&epoch_ht, &epoch_wr); srand48((long int)time(NULL)); if (ck_ht_init(&ht, CK_HT_MODE_BYTESTRING, NULL, &my_allocator, 8, lrand48()) == false) { @@ -200,7 +200,7 @@ table_reset(void) } static void * -ht_reader(void *unused) +reader(void *unused) { size_t i; ck_epoch_record_t epoch_record; @@ -216,7 +216,7 @@ ht_reader(void *unused) ck_epoch_register(&epoch_ht, &epoch_record); for (;;) { j++; - ck_epoch_read_begin(&epoch_record); + ck_epoch_begin(&epoch_ht, &epoch_record); s = rdtsc(); for (i = 0; i < keys_length; i++) { char *r; @@ -235,7 +235,7 @@ ht_reader(void *unused) exit(EXIT_FAILURE); } a += rdtsc() - s; - ck_epoch_read_end(&epoch_record); + ck_epoch_end(&epoch_ht, &epoch_record); n_state = ck_pr_load_int(&state); if (n_state != state_previous) { @@ -335,7 +335,7 @@ main(int argc, char *argv[]) table_init(); for (i = 0; i < (size_t)n_threads; i++) { - if (pthread_create(&readers[i], NULL, ht_reader, NULL) != 0) { + if (pthread_create(&readers[i], NULL, reader, NULL) != 0) { fprintf(stderr, "ERROR: Failed to create thread %zu.\n", i); exit(EXIT_FAILURE); } @@ -352,7 +352,6 @@ main(int argc, char *argv[]) fprintf(stderr, " | Executing SMR test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_write_begin(&epoch_wr); if (table_reset() == false) { fprintf(stderr, "ERROR: Failed to reset hash table.\n"); exit(EXIT_FAILURE); @@ -363,27 +362,23 @@ main(int argc, char *argv[]) d += table_insert(keys[i]) == false; e = rdtsc(); a += e - s; - ck_epoch_write_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); fprintf(stderr, " | Executing replacement test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_write_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) table_replace(keys[i]); e = rdtsc(); a += e - s; - ck_epoch_write_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); fprintf(stderr, " | Executing get test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_read_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) { if (table_get(keys[i]) == NULL) { @@ -393,14 +388,12 @@ main(int argc, char *argv[]) } e = rdtsc(); a += e - s; - ck_epoch_read_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); a = 0; fprintf(stderr, " | Executing removal test..."); for (j = 0; j < r; j++) { - ck_epoch_write_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) table_remove(keys[i]); @@ -409,31 +402,28 @@ main(int argc, char *argv[]) for (i = 0; i < keys_length; i++) table_insert(keys[i]); - ck_epoch_write_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); fprintf(stderr, " | Executing negative look-up test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_read_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) { table_get("\x50\x03\x04\x05\x06\x10"); } e = rdtsc(); a += e - s; - ck_epoch_read_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); ck_epoch_record_t epoch_temporary = epoch_wr; - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); - fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> " - "%u pending, %u peak, %" PRIu64 " reclamations\n\n", - epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations, - epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations); + fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> " + "%u pending, %u peak, %lu reclamations\n\n", + epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch, + epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch); fprintf(stderr, " ,- READER CONCURRENCY\n"); fprintf(stderr, " | Executing reader test..."); @@ -474,7 +464,7 @@ main(int argc, char *argv[]) while (ck_pr_load_int(&barrier[HT_STATE_STRICT_REPLACEMENT]) != n_threads) ck_pr_stall(); table_reset(); - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n", a / (repeated * keys_length), accumulator[HT_STATE_STRICT_REPLACEMENT] / n_threads); @@ -510,7 +500,7 @@ main(int argc, char *argv[]) ck_pr_stall(); table_reset(); - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n", a / (repeated * keys_length), accumulator[HT_STATE_DELETION] / n_threads); @@ -550,18 +540,18 @@ main(int argc, char *argv[]) while (ck_pr_load_int(&barrier[HT_STATE_REPLACEMENT]) != n_threads) ck_pr_stall(); table_reset(); - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n", a / (repeated * keys_length), accumulator[HT_STATE_REPLACEMENT] / n_threads); ck_pr_inc_int(&barrier[HT_STATE_REPLACEMENT]); epoch_temporary = epoch_wr; - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); - fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> " - "%u pending, %u peak, %" PRIu64 " reclamations\n\n", - epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations, - epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations); + fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> " + "%u pending, %u peak, %lu reclamations\n\n", + epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch, + epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch); return 0; } #else diff --git a/regressions/ck_ht/benchmark/parallel_direct.c b/regressions/ck_ht/benchmark/parallel_direct.c index 94e834c..d39fb3b 100644 --- a/regressions/ck_ht/benchmark/parallel_direct.c +++ b/regressions/ck_ht/benchmark/parallel_direct.c @@ -105,7 +105,7 @@ ht_free(void *p, size_t b, bool r) if (r == true) { /* Destruction requires safe memory reclamation. */ - ck_epoch_free(&epoch_wr, &(--e)->epoch_entry, ht_destroy); + ck_epoch_call(&epoch_wr, &(--e)->epoch_entry, ht_destroy); } else { free(--e); } @@ -133,7 +133,7 @@ static void table_init(void) { - ck_epoch_init(&epoch_ht, 10); + ck_epoch_init(&epoch_ht); ck_epoch_register(&epoch_ht, &epoch_wr); srand48((long int)time(NULL)); if (ck_ht_init(&ht, CK_HT_MODE_DIRECT, hash_function, &my_allocator, 8, lrand48()) == false) { @@ -222,7 +222,7 @@ ht_reader(void *unused) ck_epoch_register(&epoch_ht, &epoch_record); for (;;) { j++; - ck_epoch_read_begin(&epoch_record); + ck_epoch_begin(&epoch_ht, &epoch_record); s = rdtsc(); for (i = 0; i < keys_length; i++) { uintptr_t r; @@ -242,7 +242,7 @@ ht_reader(void *unused) exit(EXIT_FAILURE); } a += rdtsc() - s; - ck_epoch_read_end(&epoch_record); + ck_epoch_end(&epoch_ht, &epoch_record); n_state = ck_pr_load_int(&state); if (n_state != state_previous) { @@ -343,7 +343,6 @@ main(int argc, char *argv[]) fprintf(stderr, " | Executing SMR test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_write_begin(&epoch_wr); if (table_reset() == false) { fprintf(stderr, "ERROR: Failed to reset hash table.\n"); exit(EXIT_FAILURE); @@ -354,27 +353,23 @@ main(int argc, char *argv[]) d += table_insert(keys[i]) == false; e = rdtsc(); a += e - s; - ck_epoch_write_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); fprintf(stderr, " | Executing replacement test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_write_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) table_replace(keys[i]); e = rdtsc(); a += e - s; - ck_epoch_write_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); fprintf(stderr, " | Executing get test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_read_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) { if (table_get(keys[i]) == 0) { @@ -384,14 +379,12 @@ main(int argc, char *argv[]) } e = rdtsc(); a += e - s; - ck_epoch_read_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); a = 0; fprintf(stderr, " | Executing removal test..."); for (j = 0; j < r; j++) { - ck_epoch_write_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) table_remove(keys[i]); @@ -400,31 +393,28 @@ main(int argc, char *argv[]) for (i = 0; i < keys_length; i++) table_insert(keys[i]); - ck_epoch_write_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); fprintf(stderr, " | Executing negative look-up test..."); a = 0; for (j = 0; j < r; j++) { - ck_epoch_read_begin(&epoch_wr); s = rdtsc(); for (i = 0; i < keys_length; i++) { table_get(2); } e = rdtsc(); a += e - s; - ck_epoch_read_end(&epoch_wr); } fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length)); ck_epoch_record_t epoch_temporary = epoch_wr; - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); - fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> " - "%u pending, %u peak, %" PRIu64 " reclamations\n\n", - epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations, - epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations); + fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> " + "%u pending, %u peak, %lu reclamations\n\n", + epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch, + epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch); fprintf(stderr, " ,- READER CONCURRENCY\n"); fprintf(stderr, " | Executing reader test..."); @@ -465,7 +455,7 @@ main(int argc, char *argv[]) while (ck_pr_load_int(&barrier[HT_STATE_STRICT_REPLACEMENT]) != n_threads) ck_pr_stall(); table_reset(); - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n", a / (repeated * keys_length), accumulator[HT_STATE_STRICT_REPLACEMENT] / n_threads); @@ -501,7 +491,7 @@ main(int argc, char *argv[]) ck_pr_stall(); table_reset(); - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n", a / (repeated * keys_length), accumulator[HT_STATE_DELETION] / n_threads); @@ -541,18 +531,18 @@ main(int argc, char *argv[]) while (ck_pr_load_int(&barrier[HT_STATE_REPLACEMENT]) != n_threads) ck_pr_stall(); table_reset(); - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n", a / (repeated * keys_length), accumulator[HT_STATE_REPLACEMENT] / n_threads); ck_pr_inc_int(&barrier[HT_STATE_REPLACEMENT]); epoch_temporary = epoch_wr; - ck_epoch_purge(&epoch_wr); + ck_epoch_synchronize(&epoch_ht, &epoch_wr); - fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> " - "%u pending, %u peak, %" PRIu64 " reclamations\n\n", - epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations, - epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations); + fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> " + "%u pending, %u peak, %lu reclamations\n\n", + epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch, + epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch); return 0; } #else diff --git a/src/ck_epoch.c b/src/ck_epoch.c index e101763..e445d94 100644 --- a/src/ck_epoch.c +++ b/src/ck_epoch.c @@ -37,22 +37,114 @@ #include #include +/* + * Only three distinct epoch values are needed. If any thread is in + * a "critical section" then it would have acquired some snapshot (e) + * of the global epoch value (e_g) and set an active flag. Any hazardous + * references will only occur after a full memory barrier. For example, + * assume an initial e_g value of 1, e value of 0 and active value of 0. + * + * ck_epoch_begin(...) + * e = e_g + * active = 1 + * memory_barrier(); + * + * Any serialized reads may observe e = 0 or e = 1 with active = 0, or + * e = 0 or e = 1 with active = 1. The e_g value can only go from 1 + * to 2 if every thread has already observed the value of "1" (or the + * value we are incrementing from). This guarantees us that for any + * given value e_g, any threads with-in critical sections (referred + * to as "active" threads from here on) would have an e value of + * e_g - 1 or e_g. This also means that hazardous references may be + * shared in both e_g - 1 and e_g even if they are logically deleted + * in e_g. + * + * For example, assume all threads have an e value of e_g. Another + * thread may increment to e_g to e_g + 1. Older threads may have + * a reference to an object which is only deleted in e_g + 1. It + * could be that reader threads are executing some hash table look-ups, + * while some other writer thread (which causes epoch counter tick) + * actually deletes the same items that reader threads are looking + * up (this writer thread having an e value of e_g + 1). This is possible + * if the writer thread re-observes the epoch after the counter tick. + * + * Psuedo-code for writer: + * ck_epoch_begin() + * ht_delete(x) + * ck_epoch_end() + * ck_epoch_begin() + * ht_delete(x) + * ck_epoch_end() + * + * Psuedo-code for reader: + * for (;;) { + * x = ht_lookup(x) + * ck_pr_inc(&x->value); + * } + * + * Of course, it is also possible for references logically deleted + * at e_g - 1 to still be accessed at e_g as threads are "active" + * at the same time (real-world time) mutating shared objects. + * + * Now, if the epoch counter is ticked to e_g + 1, then no new + * hazardous references could exist to objects logically deleted at + * e_g - 1. The reason for this is that at e_g + 1, all epoch read-side + * critical sections started at e_g - 1 must have been completed. If + * any epoch read-side critical sections at e_g - 1 were still active, + * then we would never increment to e_g + 1 (active != 0 ^ e != e_g). + * Additionally, e_g may still have hazardous references to objects logically + * deleted at e_g - 1 which means objects logically deleted at e_g - 1 cannot + * be deleted at e_g + 1 (since it is valid for active threads to be at e_g or + * e_g + 1 and threads at e_g still require safe memory accesses). + * + * However, at e_g + 2, all active threads must be either at e_g + 1 or + * e_g + 2. Though e_g + 2 may share hazardous references with e_g + 1, + * and e_g + 1 shares hazardous references to e_g, no active threads are + * at e_g or e_g - 1. This means no hazardous references could exist to + * objects deleted at e_g - 1 (at e_g + 2). + * + * To summarize these important points, + * 1) Active threads will always have a value of e_g or e_g - 1. + * 2) Items that are logically deleted at e_g or e_g - 1 cannot be + * physically deleted. + * 3) Objects logically deleted at e_g - 1 can be physically destroyed + * at e_g + 2. In other words, for any current value of the global epoch + * counter e_g, objects logically deleted at e_g can be physically + * deleted at e_g + 3. + * + * Last but not least, if we are at e_g + 2, then no active thread is at + * e_g which means it is safe to apply modulo-3 arithmetic to e_g value + * in order to re-use e_g to represent the e_g + 3 state. This means it is + * sufficient to represent e_g using only the values 0, 1 or 2. Every time + * a thread re-visits a e_g (which can be determined with a non-empty deferral + * list) it can assume objects in the e_g deferral list involved at least + * three e_g transitions and are thus, safe, for physical deletion. + * + * Blocking semantics for epoch reclamation have additional restrictions. + * Though we only require three deferral lists, reasonable blocking semantics + * must be able to more gracefully handle bursty write work-loads which could + * easily cause e_g wrap-around if modulo-3 arithmetic is used. This allows for + * easy-to-trigger live-lock situations. The work-around to work around + * this is to not apply modulo arithmetic to e_g but only to deferral list + * indexing. + */ +#define CK_EPOCH_GRACE 3U + enum { - CK_EPOCH_USED = 0, - CK_EPOCH_FREE = 1 + CK_EPOCH_STATE_USED = 0, + CK_EPOCH_STATE_FREE = 1 }; CK_STACK_CONTAINER(struct ck_epoch_record, record_next, ck_epoch_record_container) CK_STACK_CONTAINER(struct ck_epoch_entry, stack_entry, ck_epoch_entry_container) void -ck_epoch_init(struct ck_epoch *global, unsigned int threshold) +ck_epoch_init(struct ck_epoch *global) { ck_stack_init(&global->records); global->epoch = 1; global->n_free = 0; - global->threshold = threshold; ck_pr_fence_store(); return; } @@ -62,7 +154,7 @@ ck_epoch_recycle(struct ck_epoch *global) { struct ck_epoch_record *record; ck_stack_entry_t *cursor; - unsigned int status; + unsigned int state; if (ck_pr_load_uint(&global->n_free) == 0) return (NULL); @@ -70,10 +162,10 @@ ck_epoch_recycle(struct ck_epoch *global) CK_STACK_FOREACH(&global->records, cursor) { record = ck_epoch_record_container(cursor); - if (ck_pr_load_uint(&record->status) == CK_EPOCH_FREE) { + if (ck_pr_load_uint(&record->state) == CK_EPOCH_STATE_FREE) { ck_pr_fence_load(); - status = ck_pr_fas_uint(&record->status, CK_EPOCH_USED); - if (status == CK_EPOCH_FREE) { + state = ck_pr_fas_uint(&record->state, CK_EPOCH_STATE_USED); + if (state == CK_EPOCH_STATE_FREE) { ck_pr_dec_uint(&global->n_free); return record; } @@ -88,14 +180,12 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record) { size_t i; - record->status = CK_EPOCH_USED; + record->state = CK_EPOCH_STATE_USED; record->active = 0; record->epoch = 0; - record->delta = 0; - record->n_pending = 0; + record->n_dispatch = 0; record->n_peak = 0; - record->n_reclamations = 0; - record->global = global; + record->n_pending = 0; for (i = 0; i < CK_EPOCH_LENGTH; i++) ck_stack_init(&record->pending[i]); @@ -112,151 +202,148 @@ ck_epoch_unregister(struct ck_epoch_record *record) record->active = 0; record->epoch = 0; - record->delta = 0; - record->n_pending = 0; + record->n_dispatch = 0; record->n_peak = 0; - record->n_reclamations = 0; + record->n_pending = 0; for (i = 0; i < CK_EPOCH_LENGTH; i++) ck_stack_init(&record->pending[i]); ck_pr_fence_store(); - ck_pr_store_uint(&record->status, CK_EPOCH_FREE); - ck_pr_inc_uint(&record->global->n_free); + ck_pr_store_uint(&record->state, CK_EPOCH_STATE_FREE); return; } -void -ck_epoch_tick(struct ck_epoch *global, struct ck_epoch_record *record) +static struct ck_epoch_record * +ck_epoch_scan(struct ck_epoch *global, struct ck_epoch_record *cr, unsigned int epoch) { - struct ck_epoch_record *c_record; ck_stack_entry_t *cursor; - unsigned int g_epoch = ck_pr_load_uint(&global->epoch); - CK_STACK_FOREACH(&global->records, cursor) { - c_record = ck_epoch_record_container(cursor); - if (ck_pr_load_uint(&c_record->status) == CK_EPOCH_FREE || - c_record == record) + if (cr == NULL) { + cursor = CK_STACK_FIRST(&global->records); + } else { + cursor = &cr->record_next; + } + + while (cursor != NULL) { + unsigned int state; + + cr = ck_epoch_record_container(cursor); + + state = ck_pr_load_uint(&cr->state); + if (state & CK_EPOCH_STATE_FREE) continue; - if (ck_pr_load_uint(&c_record->active) != 0 && - ck_pr_load_uint(&c_record->epoch) != g_epoch) - return; + if (ck_pr_load_uint(&cr->active) != 0 && + ck_pr_load_uint(&cr->epoch) != epoch) + return cr; + + cursor = CK_STACK_NEXT(cursor); } - /* - * If we have multiple writers, it is much easier to starve - * reclamation if we loop through the epoch domain. It may - * be worth it to add an SPMC variant to ck_epoch that relies - * on atomic increment operations instead. - */ - ck_pr_cas_uint(&global->epoch, g_epoch, (g_epoch + 1) & (CK_EPOCH_LENGTH - 1)); - return; + return NULL; } -bool -ck_epoch_reclaim(struct ck_epoch_record *record) +static void +ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e) { - struct ck_epoch *global = record->global; - unsigned int g_epoch = ck_pr_load_uint(&global->epoch); - unsigned int epoch = record->epoch; + unsigned int epoch = e & (CK_EPOCH_LENGTH - 1); ck_stack_entry_t *next, *cursor; + unsigned int i = 0; - if (epoch == g_epoch) - return false; - - /* - * This means all threads with a potential reference to a - * hazard pointer will have a view as new as or newer than - * the calling thread. No active reference should exist to - * any object in the record's pending list. - */ - CK_STACK_FOREACH_SAFE(&record->pending[g_epoch], cursor, next) { + CK_STACK_FOREACH_SAFE(&record->pending[epoch], cursor, next) { struct ck_epoch_entry *entry = ck_epoch_entry_container(cursor); - entry->destroy(entry); - record->n_pending--; - record->n_reclamations++; + entry->function(entry); + i++; } - ck_stack_init(&record->pending[g_epoch]); - ck_pr_store_uint(&record->epoch, g_epoch); - record->delta = 0; - return true; + if (record->n_pending > record->n_peak) + record->n_peak = record->n_pending; + + record->n_dispatch += i; + record->n_pending -= i; + ck_stack_init(&record->pending[epoch]); + return; } +/* + * This function must not be called with-in read section. + */ void -ck_epoch_write_begin(struct ck_epoch_record *record) +ck_epoch_barrier(struct ck_epoch *global, struct ck_epoch_record *record) { - struct ck_epoch *global = record->global; - - ck_pr_store_uint(&record->active, record->active + 1); + struct ck_epoch_record *cr; + unsigned int delta, epoch, goal, i; /* - * In the case of recursive write sections, avoid ticking - * over global epoch. + * Guarantee any mutations previous to the barrier will be made visible + * with respect to epoch snapshots we will read. */ - if (record->active > 1) - return; - ck_pr_fence_memory(); - for (;;) { - /* - * Reclaim deferred objects if possible and - * acquire a new snapshot of the global epoch. - */ - if (ck_epoch_reclaim(record) == true) - break; + + delta = epoch = ck_pr_load_uint(&global->epoch); + goal = epoch + CK_EPOCH_GRACE; + + for (i = 0, cr = NULL; i < CK_EPOCH_GRACE; cr = NULL, i++) { + /* Determine whether all threads have observed the current epoch. */ + while (cr = ck_epoch_scan(global, cr, delta), cr != NULL) + ck_pr_stall(); /* - * If we are above the global epoch record threshold, - * attempt to tick over the global epoch counter. + * Increment current epoch. CAS semantics are used to eliminate + * increment operations for synchronization that occurs for the + * same global epoch value snapshot. + * + * If we can guarantee there will only be one active barrier + * or epoch tick at a given time, then it is sufficient to + * use an increment operation. In a multi-barrier workload, + * however, it is possible to overflow the epoch value if we + * apply modulo-3 arithmetic. */ - if (++record->delta >= global->threshold) { - record->delta = 0; - ck_epoch_tick(global, record); - continue; - } + ck_pr_cas_uint_value(&global->epoch, delta, delta + 1, &delta); - break; + /* Right now, epoch overflow is handled as an edge case. */ + if ((goal > epoch) & (delta > goal)) + break; } + /* + * As the synchronize operation is non-blocking, it is possible other + * writers have already observed three or more epoch generations + * relative to the generation the caller has observed. In this case, + * it is safe to assume we are also in a grace period and are able to + * dispatch all calls across all lists. + */ + for (epoch = 0; epoch < CK_EPOCH_LENGTH; epoch++) + ck_epoch_dispatch(record, epoch); + + record->epoch = delta; return; } void -ck_epoch_free(struct ck_epoch_record *record, - ck_epoch_entry_t *entry, - ck_epoch_destructor_t destroy) +ck_epoch_synchronize(struct ck_epoch *global, struct ck_epoch_record *record) { - unsigned int epoch = ck_pr_load_uint(&record->epoch); - struct ck_epoch *global = record->global; - - entry->destroy = destroy; - ck_stack_push_spnc(&record->pending[epoch], &entry->stack_entry); - record->n_pending += 1; - - if (record->n_pending > record->n_peak) - record->n_peak = record->n_pending; - - if (record->n_pending >= global->threshold && ck_epoch_reclaim(record) == false) - ck_epoch_tick(global, record); + ck_epoch_barrier(global, record); return; } -void -ck_epoch_purge(struct ck_epoch_record *record) +bool +ck_epoch_poll(struct ck_epoch *global, struct ck_epoch_record *record) { - ck_backoff_t backoff = CK_BACKOFF_INITIALIZER; + unsigned int epoch = ck_pr_load_uint(&global->epoch); + unsigned int snapshot; + struct ck_epoch_record *cr = NULL; - while (record->n_pending > 0) { - ck_epoch_reclaim(record); - ck_epoch_tick(record->global, record); - if (record->n_pending > 0) - ck_backoff_gb(&backoff); - } + cr = ck_epoch_scan(global, cr, epoch); + if (cr != NULL) + return false; - return; + ck_pr_cas_uint_value(&global->epoch, epoch, epoch + 1, &snapshot); + ck_epoch_dispatch(record, epoch + 1); + record->epoch = snapshot; + return true; }