ck_epoch: Redesigned and improved unit test and observability.

ck_epoch_reclaim is now the replacement for ck_epoch_flush.
ck_epoch_purge guarantees that all entries are reclaimed
for the provided record before exiting.

n_peak counter has been added, which provides the peak number
of items across all reclamation lists. n_reclamations provides
the number of reclamations across the lifetime of the record.
These are cleared on unregister.

ck_epoch_update has been renamed to ck_epoch_tick.

Hazardous sections which mutate shared structures are now
expected to begin with ck_epoch_write_begin and end with
ck_epoch_end.

Hazardous sections which read shared structures are now
expected to begin with ck_epoch_read_begin and end with
ck_epoch_end.

ck_hp_free is now more aggressive. It will attempt a
reclamation cycle any time the pending count is long.
I should probably add a ck_hp_retire to have a version
which allows for bulk updates to local reclamation lists.
ck_pring
Samy Al Bahra 14 years ago
parent 492faed9a3
commit 83f1436f84

@ -55,12 +55,16 @@ enum {
struct ck_epoch;
struct ck_epoch_record {
unsigned int active;
unsigned int status;
unsigned int epoch;
ck_stack_t pending[CK_EPOCH_LENGTH];
unsigned int n_pending;
unsigned int status;
unsigned int delta;
unsigned int n_peak;
uint64_t n_reclamations;
struct ck_epoch *global;
ck_stack_entry_t record_next;
} CK_CC_CACHELINE;
typedef struct ck_epoch_record ck_epoch_record_t;
@ -125,6 +129,9 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record)
record->active = 0;
record->epoch = 0;
record->delta = 0;
record->n_pending = 0;
record->n_peak = 0;
record->n_reclamations = 0;
record->global = global;
for (i = 0; i < CK_EPOCH_LENGTH; i++)
@ -146,7 +153,7 @@ ck_epoch_unregister(struct ck_epoch_record *record)
}
CK_CC_INLINE static void
ck_epoch_update(struct ck_epoch *global, struct ck_epoch_record *record)
ck_epoch_tick(struct ck_epoch *global, struct ck_epoch_record *record)
{
struct ck_epoch_record *c_record;
ck_stack_entry_t *cursor;
@ -167,35 +174,16 @@ ck_epoch_update(struct ck_epoch *global, struct ck_epoch_record *record)
return;
}
CK_CC_INLINE static void
ck_epoch_activate(struct ck_epoch_record *record)
{
ck_pr_store_uint(&record->active, 1);
ck_pr_fence_store();
return;
}
CK_CC_INLINE static void
ck_epoch_deactivate(struct ck_epoch_record *record)
{
ck_pr_fence_store();
ck_pr_store_uint(&record->active, 0);
return;
}
CK_CC_INLINE static void
ck_epoch_start(struct ck_epoch_record *record)
CK_CC_INLINE static bool
ck_epoch_reclaim(struct ck_epoch_record *record)
{
struct ck_epoch *global = record->global;
unsigned int g_epoch;
for (;;) {
g_epoch = ck_pr_load_uint(&global->epoch);
if (record->epoch != g_epoch) {
unsigned int g_epoch = ck_pr_load_uint(&global->epoch);
unsigned int epoch = record->epoch;
ck_stack_entry_t *next, *cursor;
unsigned int epoch = record->epoch & (CK_EPOCH_LENGTH - 1);
if (epoch == g_epoch)
return false;
/*
* This means all threads with a potential reference to a
@ -203,19 +191,34 @@ ck_epoch_start(struct ck_epoch_record *record)
* the calling thread. No active reference should exist to
* any object in the record's pending list.
*/
CK_STACK_FOREACH_SAFE(&record->pending[epoch], cursor, next)
CK_STACK_FOREACH_SAFE(&record->pending[epoch], cursor, next) {
global->destroy(cursor);
record->n_pending--;
record->n_reclamations++;
}
ck_stack_init(&record->pending[epoch]);
ck_pr_store_uint(&record->epoch, g_epoch);
record->epoch = g_epoch & (CK_EPOCH_LENGTH - 1);
record->delta = 0;
return true;
}
CK_CC_INLINE static void
ck_epoch_write_begin(struct ck_epoch_record *record)
{
struct ck_epoch *global = record->global;
ck_pr_store_uint(&record->active, 1);
ck_pr_fence_store();
for (;;) {
if (ck_epoch_reclaim(record) == true)
break;
}
if (++record->delta >= global->threshold) {
record->delta = 0;
ck_epoch_update(global, record);
ck_epoch_tick(global, record);
continue;
}
@ -226,45 +229,53 @@ ck_epoch_start(struct ck_epoch_record *record)
}
CK_CC_INLINE static void
ck_epoch_stop(struct ck_epoch_record *record CK_CC_UNUSED)
ck_epoch_read_begin(struct ck_epoch_record *record)
{
ck_pr_store_uint(&record->active, 1);
ck_pr_fence_store();
return;
}
CK_CC_INLINE static void
ck_epoch_begin(struct ck_epoch_record *record)
ck_epoch_end(struct ck_epoch_record *record)
{
ck_epoch_activate(record);
ck_epoch_start(record);
ck_pr_fence_store();
ck_pr_store_uint(&record->active, 0);
return;
}
CK_CC_INLINE static void
ck_epoch_end(struct ck_epoch_record *record)
ck_epoch_free(struct ck_epoch_record *record, ck_stack_entry_t *entry)
{
struct ck_epoch *global = record->global;
unsigned int epoch = record->epoch;
ck_epoch_deactivate(record);
return;
}
ck_stack_push_spnc(&record->pending[epoch], entry);
record->n_pending += 1;
CK_CC_INLINE static void
ck_epoch_flush(struct ck_epoch_record *record)
{
if (record->n_pending > record->n_peak)
record->n_peak = record->n_pending;
if (record->n_pending >= global->threshold && ck_epoch_reclaim(record) == false)
ck_epoch_tick(global, record);
ck_epoch_update(record->global, record);
ck_epoch_start(record);
return;
}
CK_CC_INLINE static void
ck_epoch_free(struct ck_epoch_record *record, ck_stack_entry_t *entry)
ck_epoch_purge(struct ck_epoch_record *record)
{
unsigned int epoch = ck_pr_load_uint(&record->epoch) & (CK_EPOCH_LENGTH - 1);
ck_backoff_t backoff = CK_BACKOFF_INITIALIZER;
while (record->n_pending > 0) {
if (ck_epoch_reclaim(record) == false)
ck_epoch_tick(record->global, record);
else if (record->n_pending > 0)
ck_backoff_gb(&backoff);
}
ck_stack_push_spnc(&record->pending[epoch], entry);
record->delta++;
return;
}

@ -50,7 +50,7 @@ static unsigned int barrier;
static unsigned int e_barrier;
#ifndef PAIRS
#define PAIRS 1000000
#define PAIRS 5000000
#endif
struct node {
@ -95,11 +95,11 @@ thread(void *unused CK_CC_UNUSED)
while (ck_pr_load_uint(&barrier) < n_threads);
for (i = 0; i < PAIRS; i++) {
ck_epoch_begin(&record);
ck_epoch_write_begin(&record);
ck_stack_push_upmc(&stack, &entry[i]->stack_entry);
ck_epoch_end(&record);
ck_epoch_begin(&record);
ck_epoch_write_begin(&record);
s = ck_stack_pop_upmc(&stack);
ck_epoch_end(&record);
@ -110,6 +110,20 @@ thread(void *unused CK_CC_UNUSED)
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < n_threads);
fprintf(stderr, "Peak: %u\nReclamations: %" PRIu64 "\n\n",
record.n_peak, record.n_reclamations);
ck_epoch_purge(&record);
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < (n_threads << 1));
if (record.n_pending != 0) {
fprintf(stderr, "ERROR: %u pending, expecting none.\n",
record.n_pending);
exit(EXIT_FAILURE);
}
return (NULL);
}

Loading…
Cancel
Save