diff --git a/include/ck_epoch.h b/include/ck_epoch.h index c223d4c..a6f6b53 100644 --- a/include/ck_epoch.h +++ b/include/ck_epoch.h @@ -55,12 +55,16 @@ enum { struct ck_epoch; struct ck_epoch_record { unsigned int active; - unsigned int status; unsigned int epoch; ck_stack_t pending[CK_EPOCH_LENGTH]; + unsigned int n_pending; + unsigned int status; unsigned int delta; + unsigned int n_peak; + uint64_t n_reclamations; struct ck_epoch *global; ck_stack_entry_t record_next; + } CK_CC_CACHELINE; typedef struct ck_epoch_record ck_epoch_record_t; @@ -125,6 +129,9 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record) record->active = 0; record->epoch = 0; record->delta = 0; + record->n_pending = 0; + record->n_peak = 0; + record->n_reclamations = 0; record->global = global; for (i = 0; i < CK_EPOCH_LENGTH; i++) @@ -146,7 +153,7 @@ ck_epoch_unregister(struct ck_epoch_record *record) } CK_CC_INLINE static void -ck_epoch_update(struct ck_epoch *global, struct ck_epoch_record *record) +ck_epoch_tick(struct ck_epoch *global, struct ck_epoch_record *record) { struct ck_epoch_record *c_record; ck_stack_entry_t *cursor; @@ -167,55 +174,51 @@ ck_epoch_update(struct ck_epoch *global, struct ck_epoch_record *record) return; } -CK_CC_INLINE static void -ck_epoch_activate(struct ck_epoch_record *record) +CK_CC_INLINE static bool +ck_epoch_reclaim(struct ck_epoch_record *record) { + struct ck_epoch *global = record->global; + unsigned int g_epoch = ck_pr_load_uint(&global->epoch); + unsigned int epoch = record->epoch; + ck_stack_entry_t *next, *cursor; + + if (epoch == g_epoch) + return false; + + /* + * This means all threads with a potential reference to a + * hazard pointer will have a view as new as or newer than + * the calling thread. No active reference should exist to + * any object in the record's pending list. + */ + CK_STACK_FOREACH_SAFE(&record->pending[epoch], cursor, next) { + global->destroy(cursor); + record->n_pending--; + record->n_reclamations++; + } - ck_pr_store_uint(&record->active, 1); - ck_pr_fence_store(); - return; -} - -CK_CC_INLINE static void -ck_epoch_deactivate(struct ck_epoch_record *record) -{ + ck_stack_init(&record->pending[epoch]); + record->epoch = g_epoch & (CK_EPOCH_LENGTH - 1); + record->delta = 0; - ck_pr_fence_store(); - ck_pr_store_uint(&record->active, 0); - return; + return true; } CK_CC_INLINE static void -ck_epoch_start(struct ck_epoch_record *record) +ck_epoch_write_begin(struct ck_epoch_record *record) { struct ck_epoch *global = record->global; - unsigned int g_epoch; + + ck_pr_store_uint(&record->active, 1); + ck_pr_fence_store(); for (;;) { - g_epoch = ck_pr_load_uint(&global->epoch); - if (record->epoch != g_epoch) { - ck_stack_entry_t *next, *cursor; - unsigned int epoch = record->epoch & (CK_EPOCH_LENGTH - 1); - - /* - * This means all threads with a potential reference to a - * hazard pointer will have a view as new as or newer than - * the calling thread. No active reference should exist to - * any object in the record's pending list. - */ - CK_STACK_FOREACH_SAFE(&record->pending[epoch], cursor, next) - global->destroy(cursor); - - ck_stack_init(&record->pending[epoch]); - - ck_pr_store_uint(&record->epoch, g_epoch); - record->delta = 0; + if (ck_epoch_reclaim(record) == true) break; - } if (++record->delta >= global->threshold) { record->delta = 0; - ck_epoch_update(global, record); + ck_epoch_tick(global, record); continue; } @@ -226,45 +229,53 @@ ck_epoch_start(struct ck_epoch_record *record) } CK_CC_INLINE static void -ck_epoch_stop(struct ck_epoch_record *record CK_CC_UNUSED) +ck_epoch_read_begin(struct ck_epoch_record *record) { + ck_pr_store_uint(&record->active, 1); + ck_pr_fence_store(); return; } CK_CC_INLINE static void -ck_epoch_begin(struct ck_epoch_record *record) +ck_epoch_end(struct ck_epoch_record *record) { - ck_epoch_activate(record); - ck_epoch_start(record); + ck_pr_fence_store(); + ck_pr_store_uint(&record->active, 0); return; } CK_CC_INLINE static void -ck_epoch_end(struct ck_epoch_record *record) +ck_epoch_free(struct ck_epoch_record *record, ck_stack_entry_t *entry) { + struct ck_epoch *global = record->global; + unsigned int epoch = record->epoch; - ck_epoch_deactivate(record); - return; -} + ck_stack_push_spnc(&record->pending[epoch], entry); + record->n_pending += 1; -CK_CC_INLINE static void -ck_epoch_flush(struct ck_epoch_record *record) -{ + if (record->n_pending > record->n_peak) + record->n_peak = record->n_pending; + + if (record->n_pending >= global->threshold && ck_epoch_reclaim(record) == false) + ck_epoch_tick(global, record); - ck_epoch_update(record->global, record); - ck_epoch_start(record); return; } CK_CC_INLINE static void -ck_epoch_free(struct ck_epoch_record *record, ck_stack_entry_t *entry) +ck_epoch_purge(struct ck_epoch_record *record) { - unsigned int epoch = ck_pr_load_uint(&record->epoch) & (CK_EPOCH_LENGTH - 1); + ck_backoff_t backoff = CK_BACKOFF_INITIALIZER; + + while (record->n_pending > 0) { + if (ck_epoch_reclaim(record) == false) + ck_epoch_tick(record->global, record); + else if (record->n_pending > 0) + ck_backoff_gb(&backoff); + } - ck_stack_push_spnc(&record->pending[epoch], entry); - record->delta++; return; } diff --git a/regressions/ck_epoch/validate/ck_stack.c b/regressions/ck_epoch/validate/ck_stack.c index 6f76713..b565525 100644 --- a/regressions/ck_epoch/validate/ck_stack.c +++ b/regressions/ck_epoch/validate/ck_stack.c @@ -50,7 +50,7 @@ static unsigned int barrier; static unsigned int e_barrier; #ifndef PAIRS -#define PAIRS 1000000 +#define PAIRS 5000000 #endif struct node { @@ -95,11 +95,11 @@ thread(void *unused CK_CC_UNUSED) while (ck_pr_load_uint(&barrier) < n_threads); for (i = 0; i < PAIRS; i++) { - ck_epoch_begin(&record); + ck_epoch_write_begin(&record); ck_stack_push_upmc(&stack, &entry[i]->stack_entry); ck_epoch_end(&record); - ck_epoch_begin(&record); + ck_epoch_write_begin(&record); s = ck_stack_pop_upmc(&stack); ck_epoch_end(&record); @@ -110,6 +110,20 @@ thread(void *unused CK_CC_UNUSED) ck_pr_inc_uint(&e_barrier); while (ck_pr_load_uint(&e_barrier) < n_threads); + fprintf(stderr, "Peak: %u\nReclamations: %" PRIu64 "\n\n", + record.n_peak, record.n_reclamations); + + ck_epoch_purge(&record); + + ck_pr_inc_uint(&e_barrier); + while (ck_pr_load_uint(&e_barrier) < (n_threads << 1)); + + if (record.n_pending != 0) { + fprintf(stderr, "ERROR: %u pending, expecting none.\n", + record.n_pending); + exit(EXIT_FAILURE); + } + return (NULL); }