From 8580ccc5801cc4ad7183b73f5f743fa1cafe59bd Mon Sep 17 00:00:00 2001 From: Samy Al Bahra Date: Sat, 8 Apr 2017 15:05:46 -0400 Subject: [PATCH] ck_epoch: introduce synchronize_wait that allows blocking synchronize operation. --- include/ck_epoch.h | 41 ++++++++-- regressions/ck_epoch/validate/ck_epoch_call.c | 4 +- regressions/ck_epoch/validate/ck_epoch_poll.c | 4 +- .../ck_epoch/validate/ck_epoch_section.c | 6 +- .../ck_epoch/validate/ck_epoch_section_2.c | 4 +- .../ck_epoch/validate/ck_epoch_synchronize.c | 4 +- regressions/ck_epoch/validate/ck_stack.c | 2 +- regressions/ck_epoch/validate/torture.c | 4 +- src/ck_epoch.c | 81 +++++++++++++------ 9 files changed, 107 insertions(+), 43 deletions(-) diff --git a/include/ck_epoch.h b/include/ck_epoch.h index 21bb52c..4ecbaf9 100644 --- a/include/ck_epoch.h +++ b/include/ck_epoch.h @@ -93,7 +93,7 @@ struct ck_epoch_record { unsigned int n_pending; unsigned int n_peak; unsigned int n_dispatch; - unsigned int unused; + void *ct; ck_stack_t pending[CK_EPOCH_LENGTH]; ck_stack_entry_t record_next; } CK_CC_CACHELINE; @@ -113,6 +113,13 @@ typedef struct ck_epoch ck_epoch_t; void _ck_epoch_addref(ck_epoch_record_t *, ck_epoch_section_t *); void _ck_epoch_delref(ck_epoch_record_t *, ck_epoch_section_t *); +CK_CC_FORCE_INLINE static void * +ck_epoch_record_ct(const ck_epoch_record_t *record) +{ + + return ck_pr_load_ptr(&record->ct); +} + /* * Marks the beginning of an epoch-protected section. */ @@ -185,7 +192,6 @@ ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section) * of the epoch counter. Worst case, this will result in some delays * before object destruction. */ -/* CK_CC_FORCE_INLINE static void ck_epoch_call(ck_epoch_record_t *record, ck_epoch_entry_t *entry, @@ -200,13 +206,12 @@ ck_epoch_call(ck_epoch_record_t *record, ck_stack_push_spnc(&record->pending[offset], &entry->stack_entry); return; } -*/ /* * Same as ck_epoch_call, but allows for records to be shared and is reentrant. */ CK_CC_FORCE_INLINE static void -ck_epoch_call(ck_epoch_record_t *record, +ck_epoch_call_strict(ck_epoch_record_t *record, ck_epoch_entry_t *entry, ck_epoch_cb_t *function) { @@ -222,12 +227,36 @@ ck_epoch_call(ck_epoch_record_t *record, return; } +/* + * This callback is used for synchronize_wait to allow for custom blocking + * behavior. + */ +typedef void ck_epoch_wait_cb_t(ck_epoch_t *, ck_epoch_record_t *, + void *); + void ck_epoch_init(ck_epoch_t *); -ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *); -void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *); + +/* + * Attempts to recycle an unused epoch record. If one is successfully + * allocated, the record context pointer is also updated. + */ +ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *, void *); + +/* + * Registers an epoch record. An optional context pointer may be passed that + * is retrievable with ck_epoch_record_ct. + */ +void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *, void *); + +/* + * Marks a record as available for re-use by a subsequent recycle operation. + * Note that the record cannot be physically destroyed. + */ void ck_epoch_unregister(ck_epoch_record_t *); + bool ck_epoch_poll(ck_epoch_record_t *); void ck_epoch_synchronize(ck_epoch_record_t *); +void ck_epoch_synchronize_wait(ck_epoch_t *, ck_epoch_wait_cb_t *, void *); void ck_epoch_barrier(ck_epoch_record_t *); void ck_epoch_reclaim(ck_epoch_record_t *); diff --git a/regressions/ck_epoch/validate/ck_epoch_call.c b/regressions/ck_epoch/validate/ck_epoch_call.c index 29e0df8..2ec6bd4 100644 --- a/regressions/ck_epoch/validate/ck_epoch_call.c +++ b/regressions/ck_epoch/validate/ck_epoch_call.c @@ -51,8 +51,8 @@ main(void) { ck_epoch_entry_t entry; - ck_epoch_register(&epoch, &record[0]); - ck_epoch_register(&epoch, &record[1]); + ck_epoch_register(&epoch, &record[0], NULL); + ck_epoch_register(&epoch, &record[1], NULL); ck_epoch_call(&record[1], &entry, cb); ck_epoch_barrier(&record[1]); diff --git a/regressions/ck_epoch/validate/ck_epoch_poll.c b/regressions/ck_epoch/validate/ck_epoch_poll.c index 15613e8..4e8769b 100644 --- a/regressions/ck_epoch/validate/ck_epoch_poll.c +++ b/regressions/ck_epoch/validate/ck_epoch_poll.c @@ -89,7 +89,7 @@ read_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record CK_CC_CACHELINE; ck_stack_entry_t *cursor, *n; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -141,7 +141,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record; ck_stack_entry_t *s; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/regressions/ck_epoch/validate/ck_epoch_section.c b/regressions/ck_epoch/validate/ck_epoch_section.c index 12bcca1..769043b 100644 --- a/regressions/ck_epoch/validate/ck_epoch_section.c +++ b/regressions/ck_epoch/validate/ck_epoch_section.c @@ -46,8 +46,8 @@ setup_test(void) { ck_epoch_init(&epc); - ck_epoch_register(&epc, &record); - ck_epoch_register(&epc, &record2); + ck_epoch_register(&epc, &record, NULL); + ck_epoch_register(&epc, &record2, NULL); cleanup_calls = 0; return; @@ -157,7 +157,7 @@ reader_work(void *arg) ck_epoch_section_t section; struct obj *o; - ck_epoch_register(&epc, &local_record); + ck_epoch_register(&epc, &local_record, NULL); o = (struct obj *)arg; diff --git a/regressions/ck_epoch/validate/ck_epoch_section_2.c b/regressions/ck_epoch/validate/ck_epoch_section_2.c index aed3661..daf6738 100644 --- a/regressions/ck_epoch/validate/ck_epoch_section_2.c +++ b/regressions/ck_epoch/validate/ck_epoch_section_2.c @@ -64,7 +64,7 @@ read_thread(void *unused CK_CC_UNUSED) record = malloc(sizeof *record); assert(record != NULL); - ck_epoch_register(&epoch, record); + ck_epoch_register(&epoch, record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -133,7 +133,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record; unsigned long iterations = 0; - ck_epoch_register(&epoch, &record); + ck_epoch_register(&epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/regressions/ck_epoch/validate/ck_epoch_synchronize.c b/regressions/ck_epoch/validate/ck_epoch_synchronize.c index a96f4e2..c278334 100644 --- a/regressions/ck_epoch/validate/ck_epoch_synchronize.c +++ b/regressions/ck_epoch/validate/ck_epoch_synchronize.c @@ -91,7 +91,7 @@ read_thread(void *unused CK_CC_UNUSED) ck_stack_entry_t *n; unsigned int i; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -148,7 +148,7 @@ write_thread(void *unused CK_CC_UNUSED) ck_epoch_record_t record; ck_stack_entry_t *s; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/regressions/ck_epoch/validate/ck_stack.c b/regressions/ck_epoch/validate/ck_stack.c index 6e77efe..6d493e1 100644 --- a/regressions/ck_epoch/validate/ck_stack.c +++ b/regressions/ck_epoch/validate/ck_stack.c @@ -81,7 +81,7 @@ thread(void *unused CK_CC_UNUSED) unsigned long smr = 0; unsigned int i; - ck_epoch_register(&stack_epoch, &record); + ck_epoch_register(&stack_epoch, &record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/regressions/ck_epoch/validate/torture.c b/regressions/ck_epoch/validate/torture.c index fb3b1f4..f49d412 100644 --- a/regressions/ck_epoch/validate/torture.c +++ b/regressions/ck_epoch/validate/torture.c @@ -119,7 +119,7 @@ read_thread(void *unused CK_CC_UNUSED) record = malloc(sizeof *record); assert(record != NULL); - ck_epoch_register(&epoch, record); + ck_epoch_register(&epoch, record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); @@ -151,7 +151,7 @@ write_thread(void *unused CK_CC_UNUSED) record = malloc(sizeof *record); assert(record != NULL); - ck_epoch_register(&epoch, record); + ck_epoch_register(&epoch, record, NULL); if (aff_iterate(&a)) { perror("ERROR: failed to affine thread"); diff --git a/src/ck_epoch.c b/src/ck_epoch.c index 018011c..0a26800 100644 --- a/src/ck_epoch.c +++ b/src/ck_epoch.c @@ -230,7 +230,7 @@ ck_epoch_init(struct ck_epoch *global) } struct ck_epoch_record * -ck_epoch_recycle(struct ck_epoch *global) +ck_epoch_recycle(struct ck_epoch *global, void *ct) { struct ck_epoch_record *record; ck_stack_entry_t *cursor; @@ -249,6 +249,12 @@ ck_epoch_recycle(struct ck_epoch *global) CK_EPOCH_STATE_USED); if (state == CK_EPOCH_STATE_FREE) { ck_pr_dec_uint(&global->n_free); + ck_pr_store_ptr(&record->ct, ct); + + /* + * The context pointer is ordered by a + * subsequent protected section. + */ return record; } } @@ -258,7 +264,8 @@ ck_epoch_recycle(struct ck_epoch *global) } void -ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record) +ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record, + void *ct) { size_t i; @@ -269,6 +276,7 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record) record->n_dispatch = 0; record->n_peak = 0; record->n_pending = 0; + record->ct = ct; memset(&record->local, 0, sizeof record->local); for (i = 0; i < CK_EPOCH_LENGTH; i++) @@ -295,6 +303,7 @@ ck_epoch_unregister(struct ck_epoch_record *record) for (i = 0; i < CK_EPOCH_LENGTH; i++) ck_stack_init(&record->pending[i]); + ck_pr_store_ptr(&record->ct, NULL); ck_pr_fence_store(); ck_pr_store_uint(&record->state, CK_EPOCH_STATE_FREE); ck_pr_inc_uint(&global->n_free); @@ -365,8 +374,11 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e) if (n_pending > n_peak) ck_pr_store_uint(&record->n_peak, n_peak); - ck_pr_add_uint(&record->n_dispatch, i); - ck_pr_sub_uint(&record->n_pending, i); + if (i > 0) { + ck_pr_add_uint(&record->n_dispatch, i); + ck_pr_sub_uint(&record->n_pending, i); + } + return; } @@ -384,13 +396,24 @@ ck_epoch_reclaim(struct ck_epoch_record *record) return; } +CK_CC_FORCE_INLINE static void +epoch_block(struct ck_epoch *global, struct ck_epoch_record *cr, + ck_epoch_wait_cb_t *cb, void *ct) +{ + + if (cb != NULL) + cb(global, cr, ct); + + return; +} + /* * This function must not be called with-in read section. */ void -ck_epoch_synchronize(struct ck_epoch_record *record) +ck_epoch_synchronize_wait(struct ck_epoch *global, + ck_epoch_wait_cb_t *cb, void *ct) { - struct ck_epoch *global = record->global; struct ck_epoch_record *cr; unsigned int delta, epoch, goal, i; bool active; @@ -427,10 +450,27 @@ ck_epoch_synchronize(struct ck_epoch_record *record) * period. */ e_d = ck_pr_load_uint(&global->epoch); - if (e_d != delta) { - delta = e_d; - goto reload; + if (e_d == delta) { + epoch_block(global, cr, cb, ct); + continue; } + + /* + * If the epoch has been updated, we may have already + * met our goal. + */ + delta = e_d; + if ((goal > epoch) & (delta >= goal)) + goto leave; + + epoch_block(global, cr, cb, ct); + + /* + * If the epoch has been updated, then a grace period + * requires that all threads are observed idle at the + * same epoch. + */ + cr = NULL; } /* @@ -462,20 +502,6 @@ ck_epoch_synchronize(struct ck_epoch_record *record) * Otherwise, we have just acquired latest snapshot. */ delta = delta + r; - continue; - -reload: - if ((goal > epoch) & (delta >= goal)) { - /* - * Right now, epoch overflow is handled as an edge - * case. If we have already observed an epoch - * generation, then we can be sure no hazardous - * references exist to objects from this generation. We - * can actually avoid an addtional scan step at this - * point. - */ - break; - } } /* @@ -483,10 +509,19 @@ reload: * However, if non-temporal instructions are used, full barrier * semantics are necessary. */ +leave: ck_pr_fence_memory(); return; } +void +ck_epoch_synchronize(struct ck_epoch_record *record) +{ + + ck_epoch_synchronize_wait(record->global, NULL, NULL); + return; +} + void ck_epoch_barrier(struct ck_epoch_record *record) {