ck_epoch: Allow for forward progress in concurrent epoch sections.

This work is derived directly from the work of John Esmet and Paul
Khuong ({jesmet,pkhuong}@appnexus.com) and PR34.
ck_pring
Samy Al Bahra 9 years ago
parent 0d6d384f3f
commit 2e75aefc4a

@ -43,6 +43,12 @@
#define CK_EPOCH_LENGTH 4
#endif
/*
* This is used for sense detection with-respect to concurrent
* epoch sections.
*/
#define CK_EPOCH_SENSE 2
struct ck_epoch_entry;
typedef struct ck_epoch_entry ck_epoch_entry_t;
typedef void ck_epoch_cb_t(ck_epoch_entry_t *);
@ -56,16 +62,34 @@ struct ck_epoch_entry {
ck_stack_entry_t stack_entry;
};
/*
* A section object may be passed to every begin-end pair to allow for
* forward progress guarantees with-in prolonged active sections.
*/
struct ck_epoch_section {
unsigned int bucket;
};
typedef struct ck_epoch_section ck_epoch_section_t;
/*
* Return pointer to ck_epoch_entry container object.
*/
#define CK_EPOCH_CONTAINER(T, M, N) CK_CC_CONTAINER(struct ck_epoch_entry, T, M, N)
#define CK_EPOCH_CONTAINER(T, M, N) \
CK_CC_CONTAINER(struct ck_epoch_entry, T, M, N)
struct ck_epoch_ref {
unsigned int epoch;
unsigned int count;
};
struct ck_epoch_record {
struct ck_epoch *global;
unsigned int state;
unsigned int epoch;
unsigned int active;
struct {
struct ck_epoch_ref bucket[CK_EPOCH_SENSE];
} local CK_CC_CACHELINE;
unsigned int n_pending;
unsigned int n_peak;
unsigned long n_dispatch;
@ -82,11 +106,17 @@ struct ck_epoch {
};
typedef struct ck_epoch ck_epoch_t;
/*
* Internal functions.
*/
void _ck_epoch_addref(ck_epoch_record_t *, ck_epoch_section_t *);
void _ck_epoch_delref(ck_epoch_record_t *, ck_epoch_section_t *);
/*
* Marks the beginning of an epoch-protected section.
*/
CK_CC_INLINE static void
ck_epoch_begin(ck_epoch_record_t *record)
CK_CC_FORCE_INLINE static void
ck_epoch_begin(ck_epoch_record_t *record, ck_epoch_section_t *section)
{
struct ck_epoch *epoch = record->global;
@ -111,23 +141,29 @@ ck_epoch_begin(ck_epoch_record_t *record)
ck_pr_store_uint(&record->active, 1);
ck_pr_fence_store_load();
#endif
return;
} else {
ck_pr_store_uint(&record->active, record->active + 1);
}
ck_pr_store_uint(&record->active, record->active + 1);
if (section != NULL)
_ck_epoch_addref(record, section);
return;
}
/*
* Marks the end of an epoch-protected section.
*/
CK_CC_INLINE static void
ck_epoch_end(ck_epoch_record_t *record)
CK_CC_FORCE_INLINE static void
ck_epoch_end(ck_epoch_record_t *record, ck_epoch_section_t *section)
{
ck_pr_fence_release();
ck_pr_store_uint(&record->active, record->active - 1);
if (section != NULL)
_ck_epoch_delref(record, section);
return;
}
@ -136,7 +172,7 @@ ck_epoch_end(ck_epoch_record_t *record)
* argument until an epoch counter loop. This allows for a
* non-blocking deferral.
*/
CK_CC_INLINE static void
CK_CC_FORCE_INLINE static void
ck_epoch_call(ck_epoch_record_t *record,
ck_epoch_entry_t *entry,
ck_epoch_cb_t *function)

@ -1,6 +1,7 @@
.PHONY: check clean distribution
OBJECTS=ck_stack ck_epoch_synchronize ck_epoch_poll ck_epoch_call
OBJECTS=ck_stack ck_epoch_synchronize ck_epoch_poll ck_epoch_call \
ck_epoch_section
HALF=`expr $(CORES) / 2`
all: $(OBJECTS)
@ -9,6 +10,7 @@ check: all
./ck_stack $(CORES) 1
./ck_epoch_synchronize $(HALF) $(HALF) 1
./ck_epoch_poll $(CORES) 1 1
./ck_epoch_section
ck_epoch_synchronize: ck_epoch_synchronize.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_epoch_synchronize ck_epoch_synchronize.c ../../../src/ck_epoch.c
@ -16,6 +18,9 @@ ck_epoch_synchronize: ck_epoch_synchronize.c ../../../include/ck_stack.h ../../.
ck_epoch_poll: ck_epoch_poll.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_epoch_poll ck_epoch_poll.c ../../../src/ck_epoch.c
ck_epoch_section: ck_epoch_section.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_epoch_section ck_epoch_section.c ../../../src/ck_epoch.c
ck_epoch_call: ck_epoch_call.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_epoch_call ck_epoch_call.c ../../../src/ck_epoch.c

@ -108,7 +108,7 @@ read_thread(void *unused CK_CC_UNUSED)
j = 0;
for (;;) {
ck_epoch_begin(&record);
ck_epoch_begin(&record, NULL);
CK_STACK_FOREACH(&stack, cursor) {
if (cursor == NULL)
continue;
@ -116,7 +116,7 @@ read_thread(void *unused CK_CC_UNUSED)
n = CK_STACK_NEXT(cursor);
j += ck_pr_load_ptr(&n) != NULL;
}
ck_epoch_end(&record);
ck_epoch_end(&record, NULL);
if (j != 0 && ck_pr_load_uint(&readers) == 0)
ck_pr_store_uint(&readers, 1);
@ -178,10 +178,10 @@ write_thread(void *unused CK_CC_UNUSED)
}
for (i = 0; i < PAIRS_S; i++) {
ck_epoch_begin(&record);
ck_epoch_begin(&record, NULL);
s = ck_stack_pop_upmc(&stack);
e = stack_container(s);
ck_epoch_end(&record);
ck_epoch_end(&record, NULL);
ck_epoch_call(&record, &e->epoch_entry, destructor);
ck_epoch_poll(&record);

@ -0,0 +1,320 @@
/*
* Copyright 2015 John Esmet.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <assert.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <ck_epoch.h>
static ck_epoch_t epc;
static ck_epoch_record_t record, record2;
static unsigned int cleanup_calls;
static void
setup_test(void)
{
ck_epoch_init(&epc);
ck_epoch_register(&epc, &record);
ck_epoch_register(&epc, &record2);
cleanup_calls = 0;
return;
}
static void
teardown_test(void)
{
memset(&epc, 0, sizeof(ck_epoch_t));
ck_epoch_unregister(&record);
memset(&record, 0, sizeof(ck_epoch_record_t));
memset(&record2, 0, sizeof(ck_epoch_record_t));
cleanup_calls = 0;
return;
}
static void
cleanup(ck_epoch_entry_t *e)
{
(void) e;
cleanup_calls++;
return;
}
static void
test_simple_read_section(void)
{
ck_epoch_entry_t entry;
ck_epoch_section_t section;
memset(&entry, 0, sizeof(ck_epoch_entry_t));
setup_test();
ck_epoch_begin(&record, &section);
ck_epoch_call(&record, &entry, cleanup);
assert(cleanup_calls == 0);
ck_epoch_end(&record, &section);
ck_epoch_barrier(&record);
assert(cleanup_calls == 1);
teardown_test();
printf("%s passed\n", __FUNCTION__);
return;
}
static void
test_nested_read_section(void)
{
ck_epoch_entry_t entry1, entry2;
ck_epoch_section_t section1, section2;
memset(&entry1, 0, sizeof(ck_epoch_entry_t));
memset(&entry2, 0, sizeof(ck_epoch_entry_t));
setup_test();
ck_epoch_begin(&record, &section1);
ck_epoch_call(&record, &entry1, cleanup);
assert(cleanup_calls == 0);
ck_epoch_begin(&record, &section2);
ck_epoch_call(&record, &entry2, cleanup);
assert(cleanup_calls == 0);
ck_epoch_end(&record, &section2);
assert(cleanup_calls == 0);
ck_epoch_end(&record, &section1);
assert(cleanup_calls == 0);
ck_epoch_barrier(&record);
assert(cleanup_calls == 2);
teardown_test();
printf("%s passed\n", __FUNCTION__);
return;
}
struct obj {
ck_epoch_entry_t entry;
unsigned int destroyed;
};
static void *
barrier_work(void *arg)
{
unsigned int *run;
run = (unsigned int *)arg;
while (ck_pr_load_uint(run) != 0) {
/*
* Need to use record2, as record is local
* to the test thread.
*/
ck_epoch_barrier(&record2);
usleep(5 * 1000);
}
return NULL;
}
static void *
reader_work(void *arg)
{
ck_epoch_record_t local_record;
ck_epoch_section_t section;
struct obj *o;
ck_epoch_register(&epc, &local_record);
o = (struct obj *)arg;
/*
* Begin a read section. The calling thread has an open read section,
* so the object should not be destroyed for the lifetime of this
* thread.
*/
ck_epoch_begin(&local_record, &section);
usleep((rand() % 100) * 1000);
assert(ck_pr_load_uint(&o->destroyed) == 0);
ck_epoch_end(&local_record, &section);
ck_epoch_unregister(&local_record);
return NULL;
}
static void
obj_destroy(ck_epoch_entry_t *e)
{
struct obj *o;
o = (struct obj *)e;
ck_pr_fas_uint(&o->destroyed, 1);
return;
}
static void
test_single_reader_with_barrier_thread(void)
{
const int num_sections = 10;
struct obj o;
unsigned int run;
pthread_t thread;
ck_epoch_section_t sections[num_sections];
int shuffled[num_sections];
run = 1;
memset(&o, 0, sizeof(struct obj));
srand(time(NULL));
setup_test();
if (pthread_create(&thread, NULL, barrier_work, &run) != 0) {
abort();
}
/* Start a bunch of sections. */
for (int i = 0; i < num_sections; i++) {
ck_epoch_begin(&record, &sections[i]);
shuffled[i] = i;
if (i == num_sections / 2) {
usleep(1 * 1000);
}
}
/* Generate a shuffle. */
for (int i = num_sections - 1; i >= 0; i--) {
int k = rand() % (i + 1);
int tmp = shuffled[k];
shuffled[k] = shuffled[i];
shuffled[i] = tmp;
}
ck_epoch_call(&record, &o.entry, obj_destroy);
/* Close the sections in shuffle-order. */
for (int i = 0; i < num_sections; i++) {
ck_epoch_end(&record, &sections[shuffled[i]]);
if (i != num_sections - 1) {
assert(ck_pr_load_uint(&o.destroyed) == 0);
usleep(3 * 1000);
}
}
ck_pr_store_uint(&run, 0);
if (pthread_join(thread, NULL) != 0) {
abort();
}
ck_epoch_barrier(&record);
assert(ck_pr_load_uint(&o.destroyed) == 1);
teardown_test();
printf("%s passed\n", __FUNCTION__);
return;
}
static void
test_multiple_readers_with_barrier_thread(void)
{
const int num_readers = 10;
struct obj o;
unsigned int run;
ck_epoch_section_t section;
pthread_t threads[num_readers + 1];
run = 1;
memset(&o, 0, sizeof(struct obj));
memset(&section, 0, sizeof(ck_epoch_section_t));
srand(time(NULL));
setup_test();
/* Create a thread to call barrier() while we create reader threads.
* Each barrier will attempt to move the global epoch forward so
* it will make the read section code coverage more interesting. */
if (pthread_create(&threads[num_readers], NULL,
barrier_work, &run) != 0) {
abort();
}
ck_epoch_begin(&record, &section);
ck_epoch_call(&record, &o.entry, obj_destroy);
for (int i = 0; i < num_readers; i++) {
if (pthread_create(&threads[i], NULL, reader_work, &o) != 0) {
abort();
}
}
ck_epoch_end(&record, &section);
ck_pr_store_uint(&run, 0);
if (pthread_join(threads[num_readers], NULL) != 0) {
abort();
}
/* After the barrier, the object should be destroyed and readers
* should return. */
for (int i = 0; i < num_readers; i++) {
if (pthread_join(threads[i], NULL) != 0) {
abort();
}
}
teardown_test();
printf("%s passed\n", __FUNCTION__);
return;
}
int
main(void)
{
test_simple_read_section();
test_nested_read_section();
test_single_reader_with_barrier_thread();
test_multiple_readers_with_barrier_thread();
return 0;
}

@ -109,7 +109,7 @@ read_thread(void *unused CK_CC_UNUSED)
j = 0;
for (;;) {
ck_epoch_begin(&record);
ck_epoch_begin(&record, NULL);
CK_STACK_FOREACH(&stack, cursor) {
if (cursor == NULL)
continue;
@ -117,7 +117,7 @@ read_thread(void *unused CK_CC_UNUSED)
n = CK_STACK_NEXT(cursor);
j += ck_pr_load_ptr(&n) != NULL;
}
ck_epoch_end(&record);
ck_epoch_end(&record, NULL);
if (j != 0 && ck_pr_load_uint(&readers) == 0)
ck_pr_store_uint(&readers, 1);
@ -179,10 +179,10 @@ write_thread(void *unused CK_CC_UNUSED)
}
for (i = 0; i < PAIRS_S; i++) {
ck_epoch_begin(&record);
ck_epoch_begin(&record, NULL);
s = ck_stack_pop_upmc(&stack);
e = stack_container(s);
ck_epoch_end(&record);
ck_epoch_end(&record, NULL);
if (i & 1) {
ck_epoch_synchronize(&record);

@ -104,10 +104,10 @@ thread(void *unused CK_CC_UNUSED)
while (ck_pr_load_uint(&barrier) < n_threads);
for (i = 0; i < PAIRS; i++) {
ck_epoch_begin(&record);
ck_epoch_begin(&record, NULL);
ck_stack_push_upmc(&stack, &entry[i]->stack_entry);
s = ck_stack_pop_upmc(&stack);
ck_epoch_end(&record);
ck_epoch_end(&record, NULL);
e = stack_container(s);
ck_epoch_call(&record, &e->epoch_entry, destructor);

@ -237,7 +237,7 @@ reader(void *unused)
ck_epoch_register(&epoch_hs, &epoch_record);
for (;;) {
j++;
ck_epoch_begin(&epoch_record);
ck_epoch_begin(&epoch_record, NULL);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
char *r;
@ -257,7 +257,7 @@ reader(void *unused)
ck_error("ERROR: Found invalid value: [%s] but expected [%s]\n", (char *)r, keys[i]);
}
a += rdtsc() - s;
ck_epoch_end(&epoch_record);
ck_epoch_end(&epoch_record, NULL);
n_state = ck_pr_load_int(&state);
if (n_state != state_previous) {

@ -224,7 +224,7 @@ reader(void *unused)
ck_epoch_register(&epoch_ht, &epoch_record);
for (;;) {
j++;
ck_epoch_begin(&epoch_record);
ck_epoch_begin(&epoch_record, NULL);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
char *r;
@ -242,7 +242,7 @@ reader(void *unused)
ck_error("ERROR: Found invalid value: [%s] but expected [%s]\n", r, keys[i]);
}
a += rdtsc() - s;
ck_epoch_end(&epoch_record);
ck_epoch_end(&epoch_record, NULL);
n_state = ck_pr_load_int(&state);
if (n_state != state_previous) {

@ -224,7 +224,7 @@ ht_reader(void *unused)
ck_epoch_register(&epoch_ht, &epoch_record);
for (;;) {
j++;
ck_epoch_begin(&epoch_record);
ck_epoch_begin(&epoch_record, NULL);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
uintptr_t r;
@ -243,7 +243,7 @@ ht_reader(void *unused)
(uintmax_t)r);
}
a += rdtsc() - s;
ck_epoch_end(&epoch_record);
ck_epoch_end(&epoch_record, NULL);
n_state = ck_pr_load_int(&state);
if (n_state != state_previous) {

@ -234,7 +234,7 @@ reader(void *unused)
ck_epoch_register(&epoch_hs, &epoch_record);
for (;;) {
j++;
ck_epoch_begin(&epoch_record);
ck_epoch_begin(&epoch_record, NULL);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
char *r;
@ -254,7 +254,7 @@ reader(void *unused)
ck_error("ERROR: Found invalid value: [%s] but expected [%s]\n", (char *)r, keys[i]);
}
a += rdtsc() - s;
ck_epoch_end(&epoch_record);
ck_epoch_end(&epoch_record, NULL);
n_state = ck_pr_load_int(&state);
if (n_state != state_previous) {

@ -36,6 +36,7 @@
#include <ck_pr.h>
#include <ck_stack.h>
#include <ck_stdbool.h>
#include <ck_string.h>
/*
* Only three distinct values are used for reclamation, but reclamation occurs
@ -136,6 +137,85 @@ CK_STACK_CONTAINER(struct ck_epoch_record, record_next,
CK_STACK_CONTAINER(struct ck_epoch_entry, stack_entry,
ck_epoch_entry_container)
void
_ck_epoch_delref(struct ck_epoch_record *record,
struct ck_epoch_section *section)
{
struct ck_epoch_ref *current;
unsigned int i = section->bucket;
current = &record->local.bucket[i];
current->count--;
/*
* If the current bucket no longer has any references, then
* determine whether we have already transitioned into a newer
* epoch. If so, then make sure to update our shared snapshot
* to allow for forward progress.
*
* If no other active bucket exists, then the record will go
* inactive in order to allow for forward progress.
*/
if (current->count == 0) {
struct ck_epoch_ref *other;
other = &record->local.bucket[(i + 1) & CK_EPOCH_SENSE];
if (other->count > 0 &&
((int)(current->epoch - other->epoch) < 0 ||
(current->epoch - other->epoch) > 1)) {
/*
* The other epoch value is actually the newest,
* transition to it.
*/
ck_pr_store_uint(&record->epoch, other->epoch);
}
}
return;
}
void
_ck_epoch_addref(struct ck_epoch_record *record,
struct ck_epoch_section *section)
{
struct ck_epoch *global = record->global;
struct ck_epoch_ref *ref;
unsigned int epoch, i;
epoch = ck_pr_load_uint(&global->epoch);
i = epoch & (CK_EPOCH_SENSE - 1);
ref = &record->local.bucket[i];
if (ref->count++ == 0) {
#ifndef CK_MD_TSO
struct ck_epoch_ref *previous;
/*
* The system has already ticked. If another non-zero bucket
* exists, make sure to order our observations with respect
* to it. Otherwise, it is possible to acquire a reference
* from the previous epoch generation.
*
* On TSO architectures, the monoticity of the global counter
* and load-{store, load} ordering are sufficient to guarantee
* this ordering.
*/
previous = &record->local.bucket[(i + 1) & CK_EPOCH_SENSE];
if (previous->count > 0)
ck_pr_fence_acq_rel();
#endif /* CK_MD_TSO */
/*
* If this is this is a new reference into the current
* bucket then cache the associated epoch value.
*/
ref->epoch = epoch;
}
section->bucket = i;
return;
}
void
ck_epoch_init(struct ck_epoch *global)
{
@ -187,6 +267,7 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record)
record->n_dispatch = 0;
record->n_peak = 0;
record->n_pending = 0;
memset(&record->local, 0, sizeof record->local);
for (i = 0; i < CK_EPOCH_LENGTH; i++)
ck_stack_init(&record->pending[i]);
@ -207,6 +288,7 @@ ck_epoch_unregister(struct ck_epoch_record *record)
record->n_dispatch = 0;
record->n_peak = 0;
record->n_pending = 0;
memset(&record->local, 0, sizeof record->local);
for (i = 0; i < CK_EPOCH_LENGTH; i++)
ck_stack_init(&record->pending[i]);

Loading…
Cancel
Save