Merge pull request #93 from concurrencykit/res

ck_epoch: merge in res branch.
ck_pring
Samy Al Bahra 8 years ago committed by GitHub
commit 87196ff37f

@ -92,7 +92,8 @@ struct ck_epoch_record {
} local CK_CC_CACHELINE;
unsigned int n_pending;
unsigned int n_peak;
unsigned long n_dispatch;
unsigned int n_dispatch;
void *ct;
ck_stack_t pending[CK_EPOCH_LENGTH];
ck_stack_entry_t record_next;
} CK_CC_CACHELINE;
@ -112,6 +113,13 @@ typedef struct ck_epoch ck_epoch_t;
void _ck_epoch_addref(ck_epoch_record_t *, ck_epoch_section_t *);
void _ck_epoch_delref(ck_epoch_record_t *, ck_epoch_section_t *);
CK_CC_FORCE_INLINE static void *
ck_epoch_record_ct(const ck_epoch_record_t *record)
{
return ck_pr_load_ptr(&record->ct);
}
/*
* Marks the beginning of an epoch-protected section.
*/
@ -160,9 +168,10 @@ ck_epoch_begin(ck_epoch_record_t *record, ck_epoch_section_t *section)
}
/*
* Marks the end of an epoch-protected section.
* Marks the end of an epoch-protected section. Returns true if no more
* sections exist for the caller.
*/
CK_CC_FORCE_INLINE static void
CK_CC_FORCE_INLINE static bool
ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section)
{
@ -170,15 +179,19 @@ ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section)
ck_pr_store_uint(&record->active, record->active - 1);
if (section != NULL)
_ck_epoch_delref(record, section);
return _ck_epoch_delref(record, section);
return;
return record->active == 0;
}
/*
* Defers the execution of the function pointed to by the "cb"
* argument until an epoch counter loop. This allows for a
* non-blocking deferral.
*
* We can get away without a fence here due to the monotonic nature
* of the epoch counter. Worst case, this will result in some delays
* before object destruction.
*/
CK_CC_FORCE_INLINE static void
ck_epoch_call(ck_epoch_record_t *record,
@ -195,13 +208,74 @@ ck_epoch_call(ck_epoch_record_t *record,
return;
}
/*
* Same as ck_epoch_call, but allows for records to be shared and is reentrant.
*/
CK_CC_FORCE_INLINE static void
ck_epoch_call_strict(ck_epoch_record_t *record,
ck_epoch_entry_t *entry,
ck_epoch_cb_t *function)
{
struct ck_epoch *epoch = record->global;
unsigned int e = ck_pr_load_uint(&epoch->epoch);
unsigned int offset = e & (CK_EPOCH_LENGTH - 1);
ck_pr_inc_uint(&record->n_pending);
entry->function = function;
/* Store fence is implied by push operation. */
ck_stack_push_upmc(&record->pending[offset], &entry->stack_entry);
return;
}
/*
* This callback is used for synchronize_wait to allow for custom blocking
* behavior.
*/
typedef void ck_epoch_wait_cb_t(ck_epoch_t *, ck_epoch_record_t *,
void *);
/*
* Return latest epoch value. This operation provides load ordering.
*/
CK_CC_FORCE_INLINE static unsigned int
ck_epoch_value(const ck_epoch_t *ep)
{
ck_pr_fence_load();
return ck_pr_load_uint(&ep->epoch);
}
void ck_epoch_init(ck_epoch_t *);
ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *);
void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *);
/*
* Attempts to recycle an unused epoch record. If one is successfully
* allocated, the record context pointer is also updated.
*/
ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *, void *);
/*
* Registers an epoch record. An optional context pointer may be passed that
* is retrievable with ck_epoch_record_ct.
*/
void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *, void *);
/*
* Marks a record as available for re-use by a subsequent recycle operation.
* Note that the record cannot be physically destroyed.
*/
void ck_epoch_unregister(ck_epoch_record_t *);
bool ck_epoch_poll(ck_epoch_record_t *);
void ck_epoch_synchronize(ck_epoch_record_t *);
void ck_epoch_synchronize_wait(ck_epoch_t *, ck_epoch_wait_cb_t *, void *);
void ck_epoch_barrier(ck_epoch_record_t *);
void ck_epoch_barrier_wait(ck_epoch_record_t *, ck_epoch_wait_cb_t *, void *);
/*
* Reclaim entries associated with a record. This is safe to call only on
* the caller's record or records that are using call_strict.
*/
void ck_epoch_reclaim(ck_epoch_record_t *);
#endif /* CK_EPOCH_H */

@ -37,6 +37,7 @@ static void
cb(ck_epoch_entry_t *p)
{
/* Test that we can reregister the callback. */
if (counter == 0)
ck_epoch_call(&record[1], p, cb);
@ -50,15 +51,22 @@ int
main(void)
{
ck_epoch_entry_t entry;
ck_epoch_entry_t another;
ck_epoch_register(&epoch, &record[0]);
ck_epoch_register(&epoch, &record[1]);
ck_epoch_register(&epoch, &record[0], NULL);
ck_epoch_register(&epoch, &record[1], NULL);
ck_epoch_call(&record[1], &entry, cb);
ck_epoch_barrier(&record[1]);
ck_epoch_barrier(&record[1]);
if (counter != 2)
ck_error("Expected counter value 2, read %u.\n", counter);
/* Make sure that strict works. */
ck_epoch_call_strict(&record[1], &entry, cb);
ck_epoch_call_strict(&record[1], &another, cb);
ck_epoch_barrier(&record[1]);
if (counter != 4)
ck_error("Expected counter value 4, read %u.\n", counter);
return 0;
}

@ -89,7 +89,7 @@ read_thread(void *unused CK_CC_UNUSED)
ck_epoch_record_t record CK_CC_CACHELINE;
ck_stack_entry_t *cursor, *n;
ck_epoch_register(&stack_epoch, &record);
ck_epoch_register(&stack_epoch, &record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -141,7 +141,7 @@ write_thread(void *unused CK_CC_UNUSED)
ck_epoch_record_t record;
ck_stack_entry_t *s;
ck_epoch_register(&stack_epoch, &record);
ck_epoch_register(&stack_epoch, &record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -191,7 +191,7 @@ write_thread(void *unused CK_CC_UNUSED)
ck_epoch_barrier(&record);
if (tid == 0) {
fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n",
fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %u\n\n",
record.n_peak,
(double)record.n_peak / ((double)PAIRS_S * ITERATE_S) * 100,
record.n_dispatch);

@ -46,8 +46,8 @@ setup_test(void)
{
ck_epoch_init(&epc);
ck_epoch_register(&epc, &record);
ck_epoch_register(&epc, &record2);
ck_epoch_register(&epc, &record, NULL);
ck_epoch_register(&epc, &record2, NULL);
cleanup_calls = 0;
return;
@ -88,7 +88,8 @@ test_simple_read_section(void)
ck_epoch_begin(&record, &section);
ck_epoch_call(&record, &entry, cleanup);
assert(cleanup_calls == 0);
ck_epoch_end(&record, &section);
if (ck_epoch_end(&record, &section) == false)
ck_error("expected no more sections");
ck_epoch_barrier(&record);
assert(cleanup_calls == 1);
@ -157,7 +158,7 @@ reader_work(void *arg)
ck_epoch_section_t section;
struct obj *o;
ck_epoch_register(&epc, &local_record);
ck_epoch_register(&epc, &local_record, NULL);
o = (struct obj *)arg;

@ -64,7 +64,7 @@ read_thread(void *unused CK_CC_UNUSED)
record = malloc(sizeof *record);
assert(record != NULL);
ck_epoch_register(&epoch, record);
ck_epoch_register(&epoch, record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -133,7 +133,7 @@ write_thread(void *unused CK_CC_UNUSED)
ck_epoch_record_t record;
unsigned long iterations = 0;
ck_epoch_register(&epoch, &record);
ck_epoch_register(&epoch, &record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");

@ -91,7 +91,7 @@ read_thread(void *unused CK_CC_UNUSED)
ck_stack_entry_t *n;
unsigned int i;
ck_epoch_register(&stack_epoch, &record);
ck_epoch_register(&stack_epoch, &record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -148,7 +148,7 @@ write_thread(void *unused CK_CC_UNUSED)
ck_epoch_record_t record;
ck_stack_entry_t *s;
ck_epoch_register(&stack_epoch, &record);
ck_epoch_register(&stack_epoch, &record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -204,7 +204,7 @@ write_thread(void *unused CK_CC_UNUSED)
ck_epoch_synchronize(&record);
if (tid == 0) {
fprintf(stderr, "[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n",
fprintf(stderr, "[W] Peak: %u (%2.2f%%)\n Reclamations: %u\n\n",
record.n_peak,
(double)record.n_peak / ((double)PAIRS_S * ITERATE_S) * 100,
record.n_dispatch);

@ -81,7 +81,7 @@ thread(void *unused CK_CC_UNUSED)
unsigned long smr = 0;
unsigned int i;
ck_epoch_register(&stack_epoch, &record);
ck_epoch_register(&stack_epoch, &record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -118,7 +118,7 @@ thread(void *unused CK_CC_UNUSED)
while (ck_pr_load_uint(&e_barrier) < n_threads);
fprintf(stderr, "Deferrals: %lu (%2.2f)\n", smr, (double)smr / PAIRS);
fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %lu\n\n",
fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %u\n\n",
record.n_peak,
(double)record.n_peak / PAIRS * 100,
record.n_pending,

@ -119,7 +119,7 @@ read_thread(void *unused CK_CC_UNUSED)
record = malloc(sizeof *record);
assert(record != NULL);
ck_epoch_register(&epoch, record);
ck_epoch_register(&epoch, record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
@ -151,7 +151,7 @@ write_thread(void *unused CK_CC_UNUSED)
record = malloc(sizeof *record);
assert(record != NULL);
ck_epoch_register(&epoch, record);
ck_epoch_register(&epoch, record, NULL);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");

@ -139,7 +139,7 @@ CK_STACK_CONTAINER(struct ck_epoch_entry, stack_entry,
#define CK_EPOCH_SENSE_MASK (CK_EPOCH_SENSE - 1)
void
bool
_ck_epoch_delref(struct ck_epoch_record *record,
struct ck_epoch_section *section)
{
@ -150,7 +150,7 @@ _ck_epoch_delref(struct ck_epoch_record *record,
current->count--;
if (current->count > 0)
return;
return false;
/*
* If the current bucket no longer has any references, then
@ -161,8 +161,7 @@ _ck_epoch_delref(struct ck_epoch_record *record,
* If no other active bucket exists, then the record will go
* inactive in order to allow for forward progress.
*/
other = &record->local.bucket[(i + 1) &
CK_EPOCH_SENSE_MASK];
other = &record->local.bucket[(i + 1) & CK_EPOCH_SENSE_MASK];
if (other->count > 0 &&
((int)(current->epoch - other->epoch) < 0)) {
/*
@ -172,7 +171,7 @@ _ck_epoch_delref(struct ck_epoch_record *record,
ck_pr_store_uint(&record->epoch, other->epoch);
}
return;
return true;
}
void
@ -230,7 +229,7 @@ ck_epoch_init(struct ck_epoch *global)
}
struct ck_epoch_record *
ck_epoch_recycle(struct ck_epoch *global)
ck_epoch_recycle(struct ck_epoch *global, void *ct)
{
struct ck_epoch_record *record;
ck_stack_entry_t *cursor;
@ -249,6 +248,12 @@ ck_epoch_recycle(struct ck_epoch *global)
CK_EPOCH_STATE_USED);
if (state == CK_EPOCH_STATE_FREE) {
ck_pr_dec_uint(&global->n_free);
ck_pr_store_ptr(&record->ct, ct);
/*
* The context pointer is ordered by a
* subsequent protected section.
*/
return record;
}
}
@ -258,7 +263,8 @@ ck_epoch_recycle(struct ck_epoch *global)
}
void
ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record)
ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record,
void *ct)
{
size_t i;
@ -269,6 +275,7 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record)
record->n_dispatch = 0;
record->n_peak = 0;
record->n_pending = 0;
record->ct = ct;
memset(&record->local, 0, sizeof record->local);
for (i = 0; i < CK_EPOCH_LENGTH; i++)
@ -295,6 +302,7 @@ ck_epoch_unregister(struct ck_epoch_record *record)
for (i = 0; i < CK_EPOCH_LENGTH; i++)
ck_stack_init(&record->pending[i]);
ck_pr_store_ptr(&record->ct, NULL);
ck_pr_fence_store();
ck_pr_store_uint(&record->state, CK_EPOCH_STATE_FREE);
ck_pr_inc_uint(&global->n_free);
@ -345,11 +353,10 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e)
{
unsigned int epoch = e & (CK_EPOCH_LENGTH - 1);
ck_stack_entry_t *head, *next, *cursor;
unsigned int n_pending, n_peak;
unsigned int i = 0;
head = CK_STACK_FIRST(&record->pending[epoch]);
ck_stack_init(&record->pending[epoch]);
head = ck_stack_batch_pop_upmc(&record->pending[epoch]);
for (cursor = head; cursor != NULL; cursor = next) {
struct ck_epoch_entry *entry =
ck_epoch_entry_container(cursor);
@ -359,11 +366,18 @@ ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e)
i++;
}
if (record->n_pending > record->n_peak)
record->n_peak = record->n_pending;
n_peak = ck_pr_load_uint(&record->n_peak);
n_pending = ck_pr_load_uint(&record->n_pending);
/* We don't require accuracy around peak calculation. */
if (n_pending > n_peak)
ck_pr_store_uint(&record->n_peak, n_peak);
if (i > 0) {
ck_pr_add_uint(&record->n_dispatch, i);
ck_pr_sub_uint(&record->n_pending, i);
}
record->n_dispatch += i;
record->n_pending -= i;
return;
}
@ -381,13 +395,24 @@ ck_epoch_reclaim(struct ck_epoch_record *record)
return;
}
CK_CC_FORCE_INLINE static void
epoch_block(struct ck_epoch *global, struct ck_epoch_record *cr,
ck_epoch_wait_cb_t *cb, void *ct)
{
if (cb != NULL)
cb(global, cr, ct);
return;
}
/*
* This function must not be called with-in read section.
*/
void
ck_epoch_synchronize(struct ck_epoch_record *record)
ck_epoch_synchronize_wait(struct ck_epoch *global,
ck_epoch_wait_cb_t *cb, void *ct)
{
struct ck_epoch *global = record->global;
struct ck_epoch_record *cr;
unsigned int delta, epoch, goal, i;
bool active;
@ -424,10 +449,27 @@ ck_epoch_synchronize(struct ck_epoch_record *record)
* period.
*/
e_d = ck_pr_load_uint(&global->epoch);
if (e_d != delta) {
delta = e_d;
goto reload;
if (e_d == delta) {
epoch_block(global, cr, cb, ct);
continue;
}
/*
* If the epoch has been updated, we may have already
* met our goal.
*/
delta = e_d;
if ((goal > epoch) & (delta >= goal))
goto leave;
epoch_block(global, cr, cb, ct);
/*
* If the epoch has been updated, then a grace period
* requires that all threads are observed idle at the
* same epoch.
*/
cr = NULL;
}
/*
@ -459,20 +501,6 @@ ck_epoch_synchronize(struct ck_epoch_record *record)
* Otherwise, we have just acquired latest snapshot.
*/
delta = delta + r;
continue;
reload:
if ((goal > epoch) & (delta >= goal)) {
/*
* Right now, epoch overflow is handled as an edge
* case. If we have already observed an epoch
* generation, then we can be sure no hazardous
* references exist to objects from this generation. We
* can actually avoid an addtional scan step at this
* point.
*/
break;
}
}
/*
@ -480,10 +508,19 @@ reload:
* However, if non-temporal instructions are used, full barrier
* semantics are necessary.
*/
leave:
ck_pr_fence_memory();
return;
}
void
ck_epoch_synchronize(struct ck_epoch_record *record)
{
ck_epoch_synchronize_wait(record->global, NULL, NULL);
return;
}
void
ck_epoch_barrier(struct ck_epoch_record *record)
{
@ -493,6 +530,16 @@ ck_epoch_barrier(struct ck_epoch_record *record)
return;
}
void
ck_epoch_barrier_wait(struct ck_epoch_record *record, ck_epoch_wait_cb_t *cb,
void *ct)
{
ck_epoch_synchronize_wait(record->global, cb, ct);
ck_epoch_reclaim(record);
return;
}
/*
* It may be worth it to actually apply these deferral semantics to an epoch
* that was observed at ck_epoch_call time. The problem is that the latter

Loading…
Cancel
Save