diff --git a/include/ck_epoch.h b/include/ck_epoch.h index e7ce5bc..453fa23 100644 --- a/include/ck_epoch.h +++ b/include/ck_epoch.h @@ -92,7 +92,8 @@ struct ck_epoch_record { } local CK_CC_CACHELINE; unsigned int n_pending; unsigned int n_peak; - unsigned long n_dispatch; + unsigned int n_dispatch; + void *ct; ck_stack_t pending[CK_EPOCH_LENGTH]; ck_stack_entry_t record_next; } CK_CC_CACHELINE; @@ -112,6 +113,13 @@ typedef struct ck_epoch ck_epoch_t; void _ck_epoch_addref(ck_epoch_record_t *, ck_epoch_section_t *); void _ck_epoch_delref(ck_epoch_record_t *, ck_epoch_section_t *); +CK_CC_FORCE_INLINE static void * +ck_epoch_record_ct(const ck_epoch_record_t *record) +{ + + return ck_pr_load_ptr(&record->ct); +} + /* * Marks the beginning of an epoch-protected section. */ @@ -160,9 +168,10 @@ ck_epoch_begin(ck_epoch_record_t *record, ck_epoch_section_t *section) } /* - * Marks the end of an epoch-protected section. + * Marks the end of an epoch-protected section. Returns true if no more + * sections exist for the caller. */ -CK_CC_FORCE_INLINE static void +CK_CC_FORCE_INLINE static bool ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section) { @@ -170,15 +179,19 @@ ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section) ck_pr_store_uint(&record->active, record->active - 1); if (section != NULL) - _ck_epoch_delref(record, section); + return _ck_epoch_delref(record, section); - return; + return record->active == 0; } /* * Defers the execution of the function pointed to by the "cb" * argument until an epoch counter loop. This allows for a * non-blocking deferral. + * + * We can get away without a fence here due to the monotonic nature + * of the epoch counter. Worst case, this will result in some delays + * before object destruction. */ CK_CC_FORCE_INLINE static void ck_epoch_call(ck_epoch_record_t *record, @@ -195,13 +208,74 @@ ck_epoch_call(ck_epoch_record_t *record, return; } +/* + * Same as ck_epoch_call, but allows for records to be shared and is reentrant. + */ +CK_CC_FORCE_INLINE static void +ck_epoch_call_strict(ck_epoch_record_t *record, + ck_epoch_entry_t *entry, + ck_epoch_cb_t *function) +{ + struct ck_epoch *epoch = record->global; + unsigned int e = ck_pr_load_uint(&epoch->epoch); + unsigned int offset = e & (CK_EPOCH_LENGTH - 1); + + ck_pr_inc_uint(&record->n_pending); + entry->function = function; + + /* Store fence is implied by push operation. */ + ck_stack_push_upmc(&record->pending[offset], &entry->stack_entry); + return; +} + +/* + * This callback is used for synchronize_wait to allow for custom blocking + * behavior. + */ +typedef void ck_epoch_wait_cb_t(ck_epoch_t *, ck_epoch_record_t *, + void *); + +/* + * Return latest epoch value. This operation provides load ordering. + */ +CK_CC_FORCE_INLINE static unsigned int +ck_epoch_value(const ck_epoch_t *ep) +{ + + ck_pr_fence_load(); + return ck_pr_load_uint(&ep->epoch); +} + void ck_epoch_init(ck_epoch_t *); -ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *); -void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *); + +/* + * Attempts to recycle an unused epoch record. If one is successfully + * allocated, the record context pointer is also updated. + */ +ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *, void *); + +/* + * Registers an epoch record. An optional context pointer may be passed that + * is retrievable with ck_epoch_record_ct. + */ +void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *, void *); + +/* + * Marks a record as available for re-use by a subsequent recycle operation. + * Note that the record cannot be physically destroyed. + */ void ck_epoch_unregister(ck_epoch_record_t *); + bool ck_epoch_poll(ck_epoch_record_t *); void ck_epoch_synchronize(ck_epoch_record_t *); +void ck_epoch_synchronize_wait(ck_epoch_t *, ck_epoch_wait_cb_t *, void *); void ck_epoch_barrier(ck_epoch_record_t *); +void ck_epoch_barrier_wait(ck_epoch_record_t *, ck_epoch_wait_cb_t *, void *); + +/* + * Reclaim entries associated with a record. This is safe to call only on + * the caller's record or records that are using call_strict. + */ void ck_epoch_reclaim(ck_epoch_record_t *); #endif /* CK_EPOCH_H */ diff --git a/regressions/ck_epoch/validate/ck_epoch_call.c b/regressions/ck_epoch/validate/ck_epoch_call.c index 29e0df8..1c274e0 100644 --- a/regressions/ck_epoch/validate/ck_epoch_call.c +++ b/regressions/ck_epoch/validate/ck_epoch_call.c @@ -37,6 +37,7 @@ static void cb(ck_epoch_entry_t *p) { + /* Test that we can reregister the callback. */ if (counter == 0) ck_epoch_call(&record[1], p, cb); @@ -50,15 +51,22 @@ int main(void) { ck_epoch_entry_t entry; + ck_epoch_entry_t another; - ck_epoch_register(&epoch, &record[0]); - ck_epoch_register(&epoch, &record[1]); + ck_epoch_register(&epoch, &record[0], NULL); + ck_epoch_register(&epoch, &record[1], NULL); ck_epoch_call(&record[1], &entry, cb); ck_epoch_barrier(&record[1]); ck_epoch_barrier(&record[1]); - if (counter != 2) - ck_error("Expected counter value 2, read %u.\n", counter); + + /* Make sure that strict works. */ + ck_epoch_call_strict(&record[1], &entry, cb); + ck_epoch_call_strict(&record[1], &another, cb); + ck_epoch_barrier(&record[1]); + + if (counter != 4) + ck_error("Expected counter value 4, read %u.\n", counter); return 0; } diff --git a/regressions/ck_epoch/validate/ck_epoch_poll.c b/regressions/ck_epoch/validate/ck_epoch_poll.c index aec6dd0..4e8769b 100644 --- a/regressions/ck_epoch/validate/ck_epoch_poll.c +++ b/regressions/ck_epoch/validate/ck_epoch_poll.c @@ -89,7 +89,7 @@ read_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record CK_CC_CACHELINE; ck_stack_entry_t *cursor, *n; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -141,7 +141,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record; ck_stack_entry_t *s; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -191,7 +191,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_barrier(&record); if (tid == 0) { - fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n", + fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %u\n\n", record.n_peak, (double)record.n_peak / ((double)PAIRS_S * ITERATE_S) * 100, record.n_dispatch); diff --git a/regressions/ck_epoch/validate/ck_epoch_section.c b/regressions/ck_epoch/validate/ck_epoch_section.c index 12bcca1..7b76d1c 100644 --- a/regressions/ck_epoch/validate/ck_epoch_section.c +++ b/regressions/ck_epoch/validate/ck_epoch_section.c @@ -46,8 +46,8 @@ setup_test(void) { ck_epoch_init(&epc); - ck_epoch_register(&epc, &record); - ck_epoch_register(&epc, &record2); + ck_epoch_register(&epc, &record, NULL); + ck_epoch_register(&epc, &record2, NULL); cleanup_calls = 0; return; @@ -88,7 +88,8 @@ test_simple_read_section(void) ck_epoch_begin(&record, §ion); ck_epoch_call(&record, &entry, cleanup); assert(cleanup_calls == 0); - ck_epoch_end(&record, §ion); + if (ck_epoch_end(&record, §ion) == false) + ck_error("expected no more sections"); ck_epoch_barrier(&record); assert(cleanup_calls == 1); @@ -157,7 +158,7 @@ reader_work(void *arg) ck_epoch_section_t section; struct obj *o; - ck_epoch_register(&epc, &local_record); + ck_epoch_register(&epc, &local_record, NULL); o = (struct obj *)arg; diff --git a/regressions/ck_epoch/validate/ck_epoch_section_2.c b/regressions/ck_epoch/validate/ck_epoch_section_2.c index aed3661..daf6738 100644 --- a/regressions/ck_epoch/validate/ck_epoch_section_2.c +++ b/regressions/ck_epoch/validate/ck_epoch_section_2.c @@ -64,7 +64,7 @@ read_thread(void *unused CK_CC_UNUSED) record = malloc(sizeof *record); assert(record != NULL); - ck_epoch_register(&epoch, record); + ck_epoch_register(&epoch, record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -133,7 +133,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record; unsigned long iterations = 0; - ck_epoch_register(&epoch, &record); + ck_epoch_register(&epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/regressions/ck_epoch/validate/ck_epoch_synchronize.c b/regressions/ck_epoch/validate/ck_epoch_synchronize.c index a03a4f7..c278334 100644 --- a/regressions/ck_epoch/validate/ck_epoch_synchronize.c +++ b/regressions/ck_epoch/validate/ck_epoch_synchronize.c @@ -91,7 +91,7 @@ read_thread(void *unused CK_CC_UNUSED) ck_stack_entry_t *n; unsigned int i; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -148,7 +148,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record; ck_stack_entry_t *s; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -204,7 +204,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_synchronize(&record); if (tid == 0) { - fprintf(stderr, "[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n", + fprintf(stderr, "[W] Peak: %u (%2.2f%%)\n Reclamations: %u\n\n", record.n_peak, (double)record.n_peak / ((double)PAIRS_S * ITERATE_S) * 100, record.n_dispatch); diff --git a/regressions/ck_epoch/validate/ck_stack.c b/regressions/ck_epoch/validate/ck_stack.c index fc50228..6d493e1 100644 --- a/regressions/ck_epoch/validate/ck_stack.c +++ b/regressions/ck_epoch/validate/ck_stack.c @@ -81,7 +81,7 @@ thread(void *unused CK_CC_UNUSED) unsigned long smr = 0; unsigned int i; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -118,7 +118,7 @@ thread(void *unused CK_CC_UNUSED) while (ck_pr_load_uint(&e_barrier) < n_threads); fprintf(stderr, "Deferrals: %lu (%2.2f)\n", smr, (double)smr / PAIRS); - fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %lu\n\n", + fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %u\n\n", record.n_peak, (double)record.n_peak / PAIRS * 100, record.n_pending, diff --git a/regressions/ck_epoch/validate/torture.c b/regressions/ck_epoch/validate/torture.c index fb3b1f4..f49d412 100644 --- a/regressions/ck_epoch/validate/torture.c +++ b/regressions/ck_epoch/validate/torture.c @@ -119,7 +119,7 @@ read_thread(void *unused CK_CC_UNUSED) record = malloc(sizeof *record); assert(record != NULL); - ck_epoch_register(&epoch, record); + ck_epoch_register(&epoch, record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -151,7 +151,7 @@ write_thread(void *unused CK_CC_UNUSED) record = malloc(sizeof *record); assert(record != NULL); - ck_epoch_register(&epoch, record); + ck_epoch_register(&epoch, record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/src/ck_epoch.c b/src/ck_epoch.c index 69ba6e0..a3273b4 100644 --- a/src/ck_epoch.c +++ b/src/ck_epoch.c @@ -139,7 +139,7 @@ CK_STACK_CONTAINER(struct ck_epoch_entry, stack_entry, #define CK_EPOCH_SENSE_MASK (CK_EPOCH_SENSE - 1) -void +bool _ck_epoch_delref(struct ck_epoch_record *record, struct ck_epoch_section *section) { @@ -150,7 +150,7 @@ _ck_epoch_delref(struct ck_epoch_record *record, current->count--; if (current->count > 0) - return; + return false; /* * If the current bucket no longer has any references, then @@ -161,8 +161,7 @@ _ck_epoch_delref(struct ck_epoch_record *record, * If no other active bucket exists, then the record will go * inactive in order to allow for forward progress. */ - other = &record->local.bucket[(i + 1) & - CK_EPOCH_SENSE_MASK]; + other = &record->local.bucket[(i + 1) & CK_EPOCH_SENSE_MASK]; if (other->count > 0 && ((int)(current->epoch - other->epoch) < 0)) { /* @@ -172,7 +171,7 @@ _ck_epoch_delref(struct ck_epoch_record *record, ck_pr_store_uint(&record->epoch, other->epoch); } - return; + return true; } void @@ -230,7 +229,7 @@ ck_epoch_init(struct ck_epoch *global) } struct ck_epoch_record * -ck_epoch_recycle(struct ck_epoch *global) +ck_epoch_recycle(struct ck_epoch *global, void *ct) { struct ck_epoch_record *record; ck_stack_entry_t *cursor; @@ -249,6 +248,12 @@ ck_epoch_recycle(struct ck_epoch *global) CK_EPOCH_STATE_USED); if (state == CK_EPOCH_STATE_FREE) { ck_pr_dec_uint(&global->n_free); + ck_pr_store_ptr(&record->ct, ct); + + /* + * The context pointer is ordered by a + * subsequent protected section. + */ return record; } } @@ -258,7 +263,8 @@ ck_epoch_recycle(struct ck_epoch *global) } void -ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record) +ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record, + void *ct) { size_t i; @@ -269,6 +275,7 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record) record->n_dispatch = 0; record->n_peak = 0; record->n_pending = 0; + record->ct = ct; memset(&record->local, 0, sizeof record->local); for (i = 0; i < CK_EPOCH_LENGTH; i++) @@ -295,6 +302,7 @@ ck_epoch_unregister(struct ck_epoch_record *record) for (i = 0; i < CK_EPOCH_LENGTH; i++) ck_stack_init(&record->pending[i]); + ck_pr_store_ptr(&record->ct, NULL); ck_pr_fence_store(); ck_pr_store_uint(&record->state, CK_EPOCH_STATE_FREE); ck_pr_inc_uint(&global->n_free); @@ -345,11 +353,10 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e) { unsigned int epoch = e & (CK_EPOCH_LENGTH - 1); ck_stack_entry_t *head, *next, *cursor; + unsigned int n_pending, n_peak; unsigned int i = 0; - head = CK_STACK_FIRST(&record->pending[epoch]); - ck_stack_init(&record->pending[epoch]); - + head = ck_stack_batch_pop_upmc(&record->pending[epoch]); for (cursor = head; cursor != NULL; cursor = next) { struct ck_epoch_entry *entry = ck_epoch_entry_container(cursor); @@ -359,11 +366,18 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e) i++; } - if (record->n_pending > record->n_peak) - record->n_peak = record->n_pending; + n_peak = ck_pr_load_uint(&record->n_peak); + n_pending = ck_pr_load_uint(&record->n_pending); + + /* We don't require accuracy around peak calculation. */ + if (n_pending > n_peak) + ck_pr_store_uint(&record->n_peak, n_peak); + + if (i > 0) { + ck_pr_add_uint(&record->n_dispatch, i); + ck_pr_sub_uint(&record->n_pending, i); + } - record->n_dispatch += i; - record->n_pending -= i; return; } @@ -381,13 +395,24 @@ ck_epoch_reclaim(struct ck_epoch_record *record) return; } +CK_CC_FORCE_INLINE static void +epoch_block(struct ck_epoch *global, struct ck_epoch_record *cr, + ck_epoch_wait_cb_t *cb, void *ct) +{ + + if (cb != NULL) + cb(global, cr, ct); + + return; +} + /* * This function must not be called with-in read section. */ void -ck_epoch_synchronize(struct ck_epoch_record *record) +ck_epoch_synchronize_wait(struct ck_epoch *global, + ck_epoch_wait_cb_t *cb, void *ct) { - struct ck_epoch *global = record->global; struct ck_epoch_record *cr; unsigned int delta, epoch, goal, i; bool active; @@ -424,10 +449,27 @@ ck_epoch_synchronize(struct ck_epoch_record *record) * period. */ e_d = ck_pr_load_uint(&global->epoch); - if (e_d != delta) { - delta = e_d; - goto reload; + if (e_d == delta) { + epoch_block(global, cr, cb, ct); + continue; } + + /* + * If the epoch has been updated, we may have already + * met our goal. + */ + delta = e_d; + if ((goal > epoch) & (delta >= goal)) + goto leave; + + epoch_block(global, cr, cb, ct); + + /* + * If the epoch has been updated, then a grace period + * requires that all threads are observed idle at the + * same epoch. + */ + cr = NULL; } /* @@ -459,20 +501,6 @@ ck_epoch_synchronize(struct ck_epoch_record *record) * Otherwise, we have just acquired latest snapshot. */ delta = delta + r; - continue; - -reload: - if ((goal > epoch) & (delta >= goal)) { - /* - * Right now, epoch overflow is handled as an edge - * case. If we have already observed an epoch - * generation, then we can be sure no hazardous - * references exist to objects from this generation. We - * can actually avoid an addtional scan step at this - * point. - */ - break; - } } /* @@ -480,10 +508,19 @@ reload: * However, if non-temporal instructions are used, full barrier * semantics are necessary. */ +leave: ck_pr_fence_memory(); return; } +void +ck_epoch_synchronize(struct ck_epoch_record *record) +{ + + ck_epoch_synchronize_wait(record->global, NULL, NULL); + return; +} + void ck_epoch_barrier(struct ck_epoch_record *record) { @@ -493,6 +530,16 @@ ck_epoch_barrier(struct ck_epoch_record *record) return; } +void +ck_epoch_barrier_wait(struct ck_epoch_record *record, ck_epoch_wait_cb_t *cb, + void *ct) +{ + + ck_epoch_synchronize_wait(record->global, cb, ct); + ck_epoch_reclaim(record); + return; +} + /* * It may be worth it to actually apply these deferral semantics to an epoch * that was observed at ck_epoch_call time. The problem is that the latter