diff --git a/include/ck_epoch.h b/include/ck_epoch.h index e7ce5bc..21bb52c 100644 --- a/include/ck_epoch.h +++ b/include/ck_epoch.h @@ -92,7 +92,8 @@ struct ck_epoch_record { } local CK_CC_CACHELINE; unsigned int n_pending; unsigned int n_peak; - unsigned long n_dispatch; + unsigned int n_dispatch; + unsigned int unused; ck_stack_t pending[CK_EPOCH_LENGTH]; ck_stack_entry_t record_next; } CK_CC_CACHELINE; @@ -179,7 +180,12 @@ ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section) * Defers the execution of the function pointed to by the "cb" * argument until an epoch counter loop. This allows for a * non-blocking deferral. + * + * We can get away without a fence here due to the monotonic nature + * of the epoch counter. Worst case, this will result in some delays + * before object destruction. */ +/* CK_CC_FORCE_INLINE static void ck_epoch_call(ck_epoch_record_t *record, ck_epoch_entry_t *entry, @@ -194,6 +200,27 @@ ck_epoch_call(ck_epoch_record_t *record, ck_stack_push_spnc(&record->pending[offset], &entry->stack_entry); return; } +*/ + +/* + * Same as ck_epoch_call, but allows for records to be shared and is reentrant. + */ +CK_CC_FORCE_INLINE static void +ck_epoch_call(ck_epoch_record_t *record, + ck_epoch_entry_t *entry, + ck_epoch_cb_t *function) +{ + struct ck_epoch *epoch = record->global; + unsigned int e = ck_pr_load_uint(&epoch->epoch); + unsigned int offset = e & (CK_EPOCH_LENGTH - 1); + + ck_pr_inc_uint(&record->n_pending); + entry->function = function; + + /* Store fence is implied by push operation. */ + ck_stack_push_upmc(&record->pending[offset], &entry->stack_entry); + return; +} void ck_epoch_init(ck_epoch_t *); ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *); diff --git a/src/ck_epoch.c b/src/ck_epoch.c index 69ba6e0..018011c 100644 --- a/src/ck_epoch.c +++ b/src/ck_epoch.c @@ -345,11 +345,10 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e) { unsigned int epoch = e & (CK_EPOCH_LENGTH - 1); ck_stack_entry_t *head, *next, *cursor; + unsigned int n_pending, n_peak; unsigned int i = 0; - head = CK_STACK_FIRST(&record->pending[epoch]); - ck_stack_init(&record->pending[epoch]); - + head = ck_stack_batch_pop_upmc(&record->pending[epoch]); for (cursor = head; cursor != NULL; cursor = next) { struct ck_epoch_entry *entry = ck_epoch_entry_container(cursor); @@ -359,11 +358,15 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e) i++; } - if (record->n_pending > record->n_peak) - record->n_peak = record->n_pending; + n_peak = ck_pr_load_uint(&record->n_peak); + n_pending = ck_pr_load_uint(&record->n_pending); + + /* We don't require accuracy around peak calculation. */ + if (n_pending > n_peak) + ck_pr_store_uint(&record->n_peak, n_peak); - record->n_dispatch += i; - record->n_pending -= i; + ck_pr_add_uint(&record->n_dispatch, i); + ck_pr_sub_uint(&record->n_pending, i); return; }