ck_epoch: allow record sharing and reentrancy for write-side operations.

ck_pring
Samy Al Bahra 8 years ago
parent 8c12481577
commit a25e073a2b

@ -92,7 +92,8 @@ struct ck_epoch_record {
} local CK_CC_CACHELINE; } local CK_CC_CACHELINE;
unsigned int n_pending; unsigned int n_pending;
unsigned int n_peak; unsigned int n_peak;
unsigned long n_dispatch; unsigned int n_dispatch;
unsigned int unused;
ck_stack_t pending[CK_EPOCH_LENGTH]; ck_stack_t pending[CK_EPOCH_LENGTH];
ck_stack_entry_t record_next; ck_stack_entry_t record_next;
} CK_CC_CACHELINE; } CK_CC_CACHELINE;
@ -179,7 +180,12 @@ ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section)
* Defers the execution of the function pointed to by the "cb" * Defers the execution of the function pointed to by the "cb"
* argument until an epoch counter loop. This allows for a * argument until an epoch counter loop. This allows for a
* non-blocking deferral. * non-blocking deferral.
*
* We can get away without a fence here due to the monotonic nature
* of the epoch counter. Worst case, this will result in some delays
* before object destruction.
*/ */
/*
CK_CC_FORCE_INLINE static void CK_CC_FORCE_INLINE static void
ck_epoch_call(ck_epoch_record_t *record, ck_epoch_call(ck_epoch_record_t *record,
ck_epoch_entry_t *entry, ck_epoch_entry_t *entry,
@ -194,6 +200,27 @@ ck_epoch_call(ck_epoch_record_t *record,
ck_stack_push_spnc(&record->pending[offset], &entry->stack_entry); ck_stack_push_spnc(&record->pending[offset], &entry->stack_entry);
return; return;
} }
*/
/*
* Same as ck_epoch_call, but allows for records to be shared and is reentrant.
*/
CK_CC_FORCE_INLINE static void
ck_epoch_call(ck_epoch_record_t *record,
ck_epoch_entry_t *entry,
ck_epoch_cb_t *function)
{
struct ck_epoch *epoch = record->global;
unsigned int e = ck_pr_load_uint(&epoch->epoch);
unsigned int offset = e & (CK_EPOCH_LENGTH - 1);
ck_pr_inc_uint(&record->n_pending);
entry->function = function;
/* Store fence is implied by push operation. */
ck_stack_push_upmc(&record->pending[offset], &entry->stack_entry);
return;
}
void ck_epoch_init(ck_epoch_t *); void ck_epoch_init(ck_epoch_t *);
ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *); ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *);

@ -345,11 +345,10 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e)
{ {
unsigned int epoch = e & (CK_EPOCH_LENGTH - 1); unsigned int epoch = e & (CK_EPOCH_LENGTH - 1);
ck_stack_entry_t *head, *next, *cursor; ck_stack_entry_t *head, *next, *cursor;
unsigned int n_pending, n_peak;
unsigned int i = 0; unsigned int i = 0;
head = CK_STACK_FIRST(&record->pending[epoch]); head = ck_stack_batch_pop_upmc(&record->pending[epoch]);
ck_stack_init(&record->pending[epoch]);
for (cursor = head; cursor != NULL; cursor = next) { for (cursor = head; cursor != NULL; cursor = next) {
struct ck_epoch_entry *entry = struct ck_epoch_entry *entry =
ck_epoch_entry_container(cursor); ck_epoch_entry_container(cursor);
@ -359,11 +358,15 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e)
i++; i++;
} }
if (record->n_pending > record->n_peak) n_peak = ck_pr_load_uint(&record->n_peak);
record->n_peak = record->n_pending; n_pending = ck_pr_load_uint(&record->n_pending);
/* We don't require accuracy around peak calculation. */
if (n_pending > n_peak)
ck_pr_store_uint(&record->n_peak, n_peak);
record->n_dispatch += i; ck_pr_add_uint(&record->n_dispatch, i);
record->n_pending -= i; ck_pr_sub_uint(&record->n_pending, i);
return; return;
} }

Loading…
Cancel
Save