ck_epoch: Major redesign and rewrite.

I had the pleasure of spending a significant amount of time at the most
recent LPC with Mathieu Desnoyers and Paul McKenney. In discussing
RCU semantics in relation to epoch reclamation, it was argued that
epoch reclamation is a specialisation of RCU (rather than a generalization).
In light of this discussion, I thought it would make more sense to not expose
write-side synchronization semantics aside from ck_epoch_call (similar to
RCU call), ck_epoch_poll (identical to tick), ck_epoch_barrier and
ck_epoch_synchronization (similar to ck_epoch_synchronization). Writers will
now longer have to use write-side epoch sections but can instead rely on
epoch_barrier/synchronization for blocking semantics and ck_epoch_poll
for old tick semantics.

One advantage of this is we can avoid write-side recursion for certain workloads.
Additionally, for infrequent writes, epoch_barrier and epoch_synchronization both
allow for blocking semantics to be used so you don't have to pay the cost of
epoch_entry for non-blocking dispatch.

Example usage:
e = stack_pop(mystack);
ck_epoch_synchronize(...);
free(e);

read_begin and read_end has been replaced with ck_epoch_begin and ck_epoch_end.
If multiple writers need SMR guarantees, then they can also use ck_epoch_begin
and ck_epoch_end. Any dispatch in presence of multiple writers should be done
with-in an epoch section (for now).

There are some follow-up commits to come.
ck_pring
Samy Al Bahra 13 years ago
parent 955047a7d1
commit c274e8bc54

@ -38,19 +38,20 @@
#include <ck_stack.h>
#include <stdbool.h>
/*
* CK_EPOCH_LENGTH must be a power of 2.
*/
#ifndef CK_EPOCH_LENGTH
#define CK_EPOCH_LENGTH 4
#endif
struct ck_epoch_entry;
typedef struct ck_epoch_entry ck_epoch_entry_t;
typedef void (*ck_epoch_destructor_t)(ck_epoch_entry_t *);
typedef void ck_epoch_cb_t(ck_epoch_entry_t *);
/*
* This should be embedded into objects you wish to be the target of
* ck_epoch_cb_t functions (with ck_epoch_call).
*/
struct ck_epoch_entry {
ck_epoch_destructor_t destroy;
ck_epoch_cb_t *function;
ck_stack_entry_t stack_entry;
};
@ -59,17 +60,14 @@ struct ck_epoch_entry {
*/
#define CK_EPOCH_CONTAINER(T, M, N) CK_CC_CONTAINER(struct ck_epoch_entry, T, M, N)
struct ck_epoch;
struct ck_epoch_record {
unsigned int active;
unsigned int state;
unsigned int epoch;
ck_stack_t pending[CK_EPOCH_LENGTH];
unsigned int active;
unsigned int n_pending;
unsigned int status;
unsigned int delta;
unsigned int n_peak;
uint64_t n_reclamations;
struct ck_epoch *global;
unsigned long n_dispatch;
ck_stack_t pending[CK_EPOCH_LENGTH];
ck_stack_entry_t record_next;
} CK_CC_CACHELINE;
typedef struct ck_epoch_record ck_epoch_record_t;
@ -78,13 +76,12 @@ struct ck_epoch {
unsigned int epoch;
char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
ck_stack_t records;
unsigned int threshold;
unsigned int n_free;
};
typedef struct ck_epoch ck_epoch_t;
CK_CC_INLINE static void
ck_epoch_read_begin(ck_epoch_record_t *record)
ck_epoch_begin(ck_epoch_t *epoch, ck_epoch_record_t *record)
{
/*
@ -92,8 +89,7 @@ ck_epoch_read_begin(ck_epoch_record_t *record)
* section.
*/
if (record->active == 0) {
unsigned int g_epoch = ck_pr_load_uint(&record->global->epoch);
g_epoch &= CK_EPOCH_LENGTH - 1;
unsigned int g_epoch = ck_pr_load_uint(&epoch->epoch);
ck_pr_store_uint(&record->epoch, g_epoch);
}
@ -103,45 +99,41 @@ ck_epoch_read_begin(ck_epoch_record_t *record)
}
CK_CC_INLINE static void
ck_epoch_read_end(ck_epoch_record_t *record)
ck_epoch_end(ck_epoch_t *global, ck_epoch_record_t *record)
{
ck_pr_fence_load();
ck_pr_store_uint(&record->active, record->active - 1);
return;
}
(void)global;
CK_CC_INLINE static void
ck_epoch_write_end(ck_epoch_record_t *record)
{
ck_pr_fence_store();
ck_pr_fence_memory();
ck_pr_store_uint(&record->active, record->active - 1);
return;
}
/*
* Defers the execution of the function pointed to by the "cb"
* argument until an epoch counter loop. This allows for a
* non-blocking deferral.
*/
CK_CC_INLINE static void
ck_epoch_retire(ck_epoch_record_t *record, ck_epoch_entry_t *entry, ck_epoch_destructor_t destroy)
ck_epoch_call(ck_epoch_record_t *record,
ck_epoch_entry_t *entry,
ck_epoch_cb_t *function)
{
unsigned int offset = record->epoch & (CK_EPOCH_LENGTH - 1);
entry->destroy = destroy;
ck_stack_push_spnc(&record->pending[record->epoch], &entry->stack_entry);
record->n_pending += 1;
if (record->n_pending > record->n_peak)
record->n_peak = record->n_pending;
record->n_pending++;
entry->function = function;
ck_stack_push_spnc(&record->pending[offset], &entry->stack_entry);
return;
}
void ck_epoch_init(ck_epoch_t *, unsigned int);
void ck_epoch_init(ck_epoch_t *);
ck_epoch_record_t *ck_epoch_recycle(ck_epoch_t *);
void ck_epoch_register(ck_epoch_t *, ck_epoch_record_t *);
void ck_epoch_unregister(ck_epoch_record_t *);
void ck_epoch_tick(ck_epoch_t *, ck_epoch_record_t *);
bool ck_epoch_reclaim(ck_epoch_record_t *);
void ck_epoch_write_begin(ck_epoch_record_t *);
void ck_epoch_free(ck_epoch_record_t *, ck_epoch_entry_t *, ck_epoch_destructor_t);
void ck_epoch_purge(ck_epoch_record_t *record);
bool ck_epoch_poll(ck_epoch_t *, ck_epoch_record_t *);
void ck_epoch_call(ck_epoch_record_t *, ck_epoch_entry_t *, ck_epoch_cb_t *);
void ck_epoch_synchronize(ck_epoch_t *, ck_epoch_record_t *);
void ck_epoch_barrier(ck_epoch_t *, ck_epoch_record_t *);
#endif /* _CK_EPOCH_H */

@ -74,7 +74,7 @@ bag_free(void *p, size_t b, bool r)
(void)b;
if (r == true) {
ck_epoch_free(&epoch_wr, &(--e)->epoch_entry, bag_destroy);
ck_epoch_call(&epoch_wr, &(--e)->epoch_entry, bag_destroy);
} else {
free(--e);
}
@ -107,7 +107,7 @@ reader(void *arg)
* guarantee across the bag.
*/
for (;;) {
ck_epoch_read_begin(&epoch_record);
ck_epoch_begin(&epoch_bag, &epoch_record);
ck_bag_iterator_init(&iterator, &bag);
curr_max = prev_max = prev = -1;
block = NULL;
@ -141,8 +141,7 @@ reader(void *arg)
prev = curr;
n_entries++;
}
ck_epoch_read_end(&epoch_record);
ck_epoch_end(&epoch_bag, &epoch_record);
iterations++;
if (ck_pr_load_int(&leave) == 1)
@ -169,7 +168,6 @@ writer_thread(void *unused)
for (;;) {
iteration++;
ck_epoch_write_begin(&epoch_wr);
for (i = 1; i <= writer_max; i++) {
if (ck_bag_put_spmc(&bag, (void *)(uintptr_t)i) == false) {
perror("ck_bag_put_spmc");
@ -206,7 +204,7 @@ writer_thread(void *unused)
}
}
ck_epoch_write_end(&epoch_wr);
ck_epoch_poll(&epoch_bag, &epoch_wr);
}
fprintf(stderr, "Writer %u iterations, %u writes per iteration.\n", iteration, writer_max);
@ -247,7 +245,7 @@ main(int argc, char **argv)
writer_max = (unsigned int)r;
}
ck_epoch_init(&epoch_bag, 100);
ck_epoch_init(&epoch_bag);
ck_epoch_register(&epoch_bag, &epoch_wr);
ck_bag_allocator_set(&allocator, sizeof(struct bag_epoch));
if (ck_bag_init(&bag, b, CK_BAG_ALLOCATE_GEOMETRIC) == false) {

@ -1,15 +1,19 @@
.PHONY: check clean distribution
OBJECTS=ck_stack ck_stack_read
OBJECTS=ck_stack ck_epoch_synchronize ck_epoch_poll
all: $(OBJECTS)
check: all
./ck_stack $(CORES) 10 1
./ck_stack_read $(CORES) 10 1
./ck_stack $(CORES) 1
./ck_epoch_synchronize $(CORES) 1
./ck_epoch_poll $(CORES) 1
ck_stack_read: ck_stack_read.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_stack_read ck_stack_read.c ../../../src/ck_epoch.c
ck_epoch_synchronize: ck_epoch_synchronize.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_epoch_synchronize ck_epoch_synchronize.c ../../../src/ck_epoch.c
ck_epoch_poll: ck_epoch_poll.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_epoch_poll ck_epoch_poll.c ../../../src/ck_epoch.c
ck_stack: ck_stack.c ../../../include/ck_stack.h ../../../include/ck_epoch.h ../../../src/ck_epoch.c
$(CC) $(CFLAGS) -o ck_stack ck_stack.c ../../../src/ck_epoch.c

@ -44,14 +44,13 @@
#include "../../common.h"
static unsigned int threshold;
static unsigned int n_threads;
static unsigned int barrier;
static unsigned int e_barrier;
static unsigned int readers;
#ifndef PAIRS
#define PAIRS 5000000
#define PAIRS 1000000
#endif
#ifndef ITERATE
@ -68,6 +67,7 @@ static ck_epoch_t stack_epoch;
CK_STACK_CONTAINER(struct node, stack_entry, stack_container)
CK_EPOCH_CONTAINER(struct node, epoch_entry, epoch_container)
static struct affinity a;
static const char animate[] = "-/|\\";
static void
destructor(ck_epoch_entry_t *p)
@ -111,13 +111,15 @@ read_thread(void *unused CK_CC_UNUSED)
j = 0;
for (;;) {
ck_epoch_read_begin(&record);
ck_epoch_begin(&stack_epoch, &record);
CK_STACK_FOREACH(&stack, cursor) {
n = cursor;
if (cursor == NULL)
continue;
n = CK_STACK_NEXT(cursor);
j++;
n++;
}
ck_epoch_read_end(&record);
ck_epoch_end(&stack_epoch, &record);
if (j != 0 && ck_pr_load_uint(&readers) == 0)
ck_pr_store_uint(&readers, 1);
@ -168,32 +170,36 @@ thread(void *unused CK_CC_UNUSED)
}
for (i = 0; i < PAIRS; i++) {
ck_epoch_write_begin(&record);
ck_stack_push_upmc(&stack, &entry[i]->stack_entry);
ck_epoch_write_end(&record);
}
while (ck_pr_load_uint(&readers) == 0)
ck_pr_stall();
fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] %2.2f: %c", (double)j / ITERATE, animate[i % strlen(animate)]);
for (i = 0; i < PAIRS; i++) {
ck_epoch_write_begin(&record);
s = ck_stack_pop_upmc(&stack);
ck_epoch_write_end(&record);
e = stack_container(s);
ck_epoch_free(&record, &e->epoch_entry, destructor);
ck_epoch_call(&record, &e->epoch_entry, destructor);
if (i % 1024)
ck_epoch_poll(&stack_epoch, &record);
if (i % 8192)
fprintf(stderr, "\b%c", animate[i % strlen(animate)]);
}
}
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < n_threads);
fprintf(stderr, "[W] Peak: %u (%2.2f%%)\n Reclamations: %" PRIu64 "\n\n",
ck_epoch_synchronize(&stack_epoch, &record);
fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n",
record.n_peak,
(double)record.n_peak / ((double)PAIRS * ITERATE) * 100,
record.n_reclamations);
record.n_dispatch);
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < n_threads);
return (NULL);
}
@ -203,19 +209,18 @@ main(int argc, char *argv[])
unsigned int i;
pthread_t *threads;
if (argc != 4) {
fprintf(stderr, "Usage: stack <threads> <threshold> <delta>\n");
if (argc != 3) {
fprintf(stderr, "Usage: stack <threads> <affinity delta>\n");
exit(EXIT_FAILURE);
}
n_threads = atoi(argv[1]);
threshold = atoi(argv[2]);
a.delta = atoi(argv[3]);
a.delta = atoi(argv[2]);
a.request = 0;
threads = malloc(sizeof(pthread_t) * n_threads);
ck_epoch_init(&stack_epoch, threshold);
ck_epoch_init(&stack_epoch);
for (i = 0; i < n_threads - 1; i++)
pthread_create(threads + i, NULL, read_thread, NULL);

@ -0,0 +1,246 @@
/*
* Copyright 2010-2012 Samy Al Bahra.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <unistd.h>
#include <sys/time.h>
#include <ck_backoff.h>
#include <ck_cc.h>
#include <ck_pr.h>
#include <stdbool.h>
#include <stddef.h>
#include <ck_epoch.h>
#include <ck_stack.h>
#include "../../common.h"
static unsigned int n_threads;
static unsigned int barrier;
static unsigned int e_barrier;
static unsigned int readers;
#ifndef PAIRS
#define PAIRS 5000000
#endif
#ifndef ITERATE
#define ITERATE 20
#endif
#ifndef PAIRS_S
#define PAIRS_S 10000
#endif
#ifndef ITERATE_S
#define ITERATE_S 20
#endif
struct node {
unsigned int value;
ck_stack_entry_t stack_entry;
ck_epoch_entry_t epoch_entry;
};
static ck_stack_t stack = CK_STACK_INITIALIZER;
static ck_epoch_t stack_epoch;
CK_STACK_CONTAINER(struct node, stack_entry, stack_container)
CK_EPOCH_CONTAINER(struct node, epoch_entry, epoch_container)
static struct affinity a;
static const char animate[] = "-/|\\";
static void
destructor(ck_epoch_entry_t *p)
{
struct node *e = epoch_container(p);
free(e);
return;
}
static void *
read_thread(void *unused CK_CC_UNUSED)
{
unsigned int j;
ck_epoch_record_t record CK_CC_CACHELINE;
ck_stack_entry_t *cursor;
/*
* This is redundant post-incremented in order to silence some
* irrelevant GCC warnings. It is volatile in order to prevent
* elimination.
*/
volatile ck_stack_entry_t *n;
ck_epoch_register(&stack_epoch, &record);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
exit(EXIT_FAILURE);
}
ck_pr_inc_uint(&barrier);
while (ck_pr_load_uint(&barrier) < n_threads);
while (CK_STACK_ISEMPTY(&stack) == true) {
if (ck_pr_load_uint(&readers) != 0)
break;
ck_pr_stall();
}
j = 0;
for (;;) {
ck_epoch_begin(&stack_epoch, &record);
CK_STACK_FOREACH(&stack, cursor) {
if (cursor == NULL)
continue;
n = CK_STACK_NEXT(cursor);
j++;
}
ck_epoch_end(&stack_epoch, &record);
if (j != 0 && ck_pr_load_uint(&readers) == 0)
ck_pr_store_uint(&readers, 1);
if (CK_STACK_ISEMPTY(&stack) == true &&
ck_pr_load_uint(&e_barrier) != 0)
break;
}
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < n_threads);
fprintf(stderr, "[R] Observed entries: %u\n", j);
return (NULL);
}
static void *
thread(void *unused CK_CC_UNUSED)
{
struct node **entry, *e;
unsigned int i, j;
ck_epoch_record_t record;
ck_stack_entry_t *s;
ck_epoch_register(&stack_epoch, &record);
if (aff_iterate(&a)) {
perror("ERROR: failed to affine thread");
exit(EXIT_FAILURE);
}
ck_pr_inc_uint(&barrier);
while (ck_pr_load_uint(&barrier) < n_threads);
entry = malloc(sizeof(struct node *) * PAIRS_S);
if (entry == NULL) {
fprintf(stderr, "Failed allocation.\n");
exit(EXIT_FAILURE);
}
for (j = 0; j < ITERATE_S; j++) {
for (i = 0; i < PAIRS_S; i++) {
entry[i] = malloc(sizeof(struct node));
if (entry == NULL) {
fprintf(stderr, "Failed individual allocation\n");
exit(EXIT_FAILURE);
}
}
for (i = 0; i < PAIRS_S; i++) {
ck_stack_push_upmc(&stack, &entry[i]->stack_entry);
}
while (ck_pr_load_uint(&readers) == 0)
ck_pr_stall();
fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] %2.2f: %c", (double)j / ITERATE_S, animate[i % strlen(animate)]);
for (i = 0; i < PAIRS_S; i++) {
s = ck_stack_pop_upmc(&stack);
e = stack_container(s);
ck_epoch_synchronize(&stack_epoch, &record);
if (i & 1) {
ck_epoch_call(&record, &e->epoch_entry, destructor);
} else {
destructor(&e->epoch_entry);
}
if (i % 8192) {
fprintf(stderr, "\b%c", animate[i % strlen(animate)]);
}
}
}
ck_epoch_synchronize(&stack_epoch, &record);
fprintf(stderr, "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b[W] Peak: %u (%2.2f%%)\n Reclamations: %lu\n\n",
record.n_peak,
(double)record.n_peak / ((double)PAIRS * ITERATE) * 100,
record.n_dispatch);
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < n_threads);
return (NULL);
}
int
main(int argc, char *argv[])
{
unsigned int i;
pthread_t *threads;
if (argc != 3) {
fprintf(stderr, "Usage: stack <threads> <affinity delta>\n");
exit(EXIT_FAILURE);
}
n_threads = atoi(argv[1]);
a.delta = atoi(argv[2]);
a.request = 0;
threads = malloc(sizeof(pthread_t) * n_threads);
ck_epoch_init(&stack_epoch);
for (i = 0; i < n_threads - 1; i++)
pthread_create(threads + i, NULL, read_thread, NULL);
pthread_create(threads + i, NULL, thread, NULL);
for (i = 0; i < n_threads; i++)
pthread_join(threads[i], NULL);
return (0);
}

@ -44,7 +44,6 @@
#include "../../common.h"
static unsigned int threshold;
static unsigned int n_threads;
static unsigned int barrier;
static unsigned int e_barrier;
@ -77,9 +76,10 @@ static void *
thread(void *unused CK_CC_UNUSED)
{
struct node **entry, *e;
unsigned int i;
ck_epoch_record_t record;
ck_stack_entry_t *s;
unsigned long smr = 0;
unsigned int i;
ck_epoch_register(&stack_epoch, &record);
@ -106,28 +106,27 @@ thread(void *unused CK_CC_UNUSED)
while (ck_pr_load_uint(&barrier) < n_threads);
for (i = 0; i < PAIRS; i++) {
ck_epoch_write_begin(&record);
ck_epoch_begin(&stack_epoch, &record);
ck_stack_push_upmc(&stack, &entry[i]->stack_entry);
ck_epoch_write_end(&record);
ck_epoch_write_begin(&record);
s = ck_stack_pop_upmc(&stack);
ck_epoch_write_end(&record);
ck_epoch_end(&stack_epoch, &record);
e = stack_container(s);
ck_epoch_free(&record, &e->epoch_entry, destructor);
ck_epoch_call(&record, &e->epoch_entry, destructor);
smr += ck_epoch_poll(&stack_epoch, &record) == false;
}
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < n_threads);
fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %" PRIu64 "\n\n",
fprintf(stderr, "Deferrals: %lu (%2.2f)\n", smr, (double)smr / PAIRS);
fprintf(stderr, "Peak: %u (%2.2f%%), %u pending\nReclamations: %lu\n\n",
record.n_peak,
(double)record.n_peak / PAIRS * 100,
record.n_pending,
record.n_reclamations);
record.n_dispatch);
ck_epoch_purge(&record);
ck_epoch_synchronize(&stack_epoch, &record);
ck_pr_inc_uint(&e_barrier);
while (ck_pr_load_uint(&e_barrier) < (n_threads << 1));
@ -146,19 +145,18 @@ main(int argc, char *argv[])
unsigned int i;
pthread_t *threads;
if (argc != 4) {
fprintf(stderr, "Usage: stack <threads> <threshold> <delta>\n");
if (argc != 3) {
fprintf(stderr, "Usage: stack <threads> <affinity delta>\n");
exit(EXIT_FAILURE);
}
n_threads = atoi(argv[1]);
threshold = atoi(argv[2]);
a.delta = atoi(argv[3]);
a.delta = atoi(argv[2]);
a.request = 0;
threads = malloc(sizeof(pthread_t) * n_threads);
ck_epoch_init(&stack_epoch, threshold);
ck_epoch_init(&stack_epoch);
for (i = 0; i < n_threads; i++)
pthread_create(threads + i, NULL, thread, NULL);

@ -106,7 +106,7 @@ ht_free(void *p, size_t b, bool r)
if (r == true) {
/* Destruction requires safe memory reclamation. */
ck_epoch_free(&epoch_wr, &(--e)->epoch_entry, ht_destroy);
ck_epoch_call(&epoch_wr, &(--e)->epoch_entry, ht_destroy);
} else {
free(--e);
}
@ -123,7 +123,7 @@ static void
table_init(void)
{
ck_epoch_init(&epoch_ht, 10);
ck_epoch_init(&epoch_ht);
ck_epoch_register(&epoch_ht, &epoch_wr);
srand48((long int)time(NULL));
if (ck_ht_init(&ht, CK_HT_MODE_BYTESTRING, NULL, &my_allocator, 8, lrand48()) == false) {
@ -200,7 +200,7 @@ table_reset(void)
}
static void *
ht_reader(void *unused)
reader(void *unused)
{
size_t i;
ck_epoch_record_t epoch_record;
@ -216,7 +216,7 @@ ht_reader(void *unused)
ck_epoch_register(&epoch_ht, &epoch_record);
for (;;) {
j++;
ck_epoch_read_begin(&epoch_record);
ck_epoch_begin(&epoch_ht, &epoch_record);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
char *r;
@ -235,7 +235,7 @@ ht_reader(void *unused)
exit(EXIT_FAILURE);
}
a += rdtsc() - s;
ck_epoch_read_end(&epoch_record);
ck_epoch_end(&epoch_ht, &epoch_record);
n_state = ck_pr_load_int(&state);
if (n_state != state_previous) {
@ -335,7 +335,7 @@ main(int argc, char *argv[])
table_init();
for (i = 0; i < (size_t)n_threads; i++) {
if (pthread_create(&readers[i], NULL, ht_reader, NULL) != 0) {
if (pthread_create(&readers[i], NULL, reader, NULL) != 0) {
fprintf(stderr, "ERROR: Failed to create thread %zu.\n", i);
exit(EXIT_FAILURE);
}
@ -352,7 +352,6 @@ main(int argc, char *argv[])
fprintf(stderr, " | Executing SMR test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_write_begin(&epoch_wr);
if (table_reset() == false) {
fprintf(stderr, "ERROR: Failed to reset hash table.\n");
exit(EXIT_FAILURE);
@ -363,27 +362,23 @@ main(int argc, char *argv[])
d += table_insert(keys[i]) == false;
e = rdtsc();
a += e - s;
ck_epoch_write_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
fprintf(stderr, " | Executing replacement test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_write_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++)
table_replace(keys[i]);
e = rdtsc();
a += e - s;
ck_epoch_write_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
fprintf(stderr, " | Executing get test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_read_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
if (table_get(keys[i]) == NULL) {
@ -393,14 +388,12 @@ main(int argc, char *argv[])
}
e = rdtsc();
a += e - s;
ck_epoch_read_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
a = 0;
fprintf(stderr, " | Executing removal test...");
for (j = 0; j < r; j++) {
ck_epoch_write_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++)
table_remove(keys[i]);
@ -409,31 +402,28 @@ main(int argc, char *argv[])
for (i = 0; i < keys_length; i++)
table_insert(keys[i]);
ck_epoch_write_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
fprintf(stderr, " | Executing negative look-up test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_read_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
table_get("\x50\x03\x04\x05\x06\x10");
}
e = rdtsc();
a += e - s;
ck_epoch_read_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
ck_epoch_record_t epoch_temporary = epoch_wr;
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> "
"%u pending, %u peak, %" PRIu64 " reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations);
fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> "
"%u pending, %u peak, %lu reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch);
fprintf(stderr, " ,- READER CONCURRENCY\n");
fprintf(stderr, " | Executing reader test...");
@ -474,7 +464,7 @@ main(int argc, char *argv[])
while (ck_pr_load_int(&barrier[HT_STATE_STRICT_REPLACEMENT]) != n_threads)
ck_pr_stall();
table_reset();
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n",
a / (repeated * keys_length), accumulator[HT_STATE_STRICT_REPLACEMENT] / n_threads);
@ -510,7 +500,7 @@ main(int argc, char *argv[])
ck_pr_stall();
table_reset();
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n",
a / (repeated * keys_length), accumulator[HT_STATE_DELETION] / n_threads);
@ -550,18 +540,18 @@ main(int argc, char *argv[])
while (ck_pr_load_int(&barrier[HT_STATE_REPLACEMENT]) != n_threads)
ck_pr_stall();
table_reset();
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n",
a / (repeated * keys_length), accumulator[HT_STATE_REPLACEMENT] / n_threads);
ck_pr_inc_int(&barrier[HT_STATE_REPLACEMENT]);
epoch_temporary = epoch_wr;
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> "
"%u pending, %u peak, %" PRIu64 " reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations);
fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> "
"%u pending, %u peak, %lu reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch);
return 0;
}
#else

@ -105,7 +105,7 @@ ht_free(void *p, size_t b, bool r)
if (r == true) {
/* Destruction requires safe memory reclamation. */
ck_epoch_free(&epoch_wr, &(--e)->epoch_entry, ht_destroy);
ck_epoch_call(&epoch_wr, &(--e)->epoch_entry, ht_destroy);
} else {
free(--e);
}
@ -133,7 +133,7 @@ static void
table_init(void)
{
ck_epoch_init(&epoch_ht, 10);
ck_epoch_init(&epoch_ht);
ck_epoch_register(&epoch_ht, &epoch_wr);
srand48((long int)time(NULL));
if (ck_ht_init(&ht, CK_HT_MODE_DIRECT, hash_function, &my_allocator, 8, lrand48()) == false) {
@ -222,7 +222,7 @@ ht_reader(void *unused)
ck_epoch_register(&epoch_ht, &epoch_record);
for (;;) {
j++;
ck_epoch_read_begin(&epoch_record);
ck_epoch_begin(&epoch_ht, &epoch_record);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
uintptr_t r;
@ -242,7 +242,7 @@ ht_reader(void *unused)
exit(EXIT_FAILURE);
}
a += rdtsc() - s;
ck_epoch_read_end(&epoch_record);
ck_epoch_end(&epoch_ht, &epoch_record);
n_state = ck_pr_load_int(&state);
if (n_state != state_previous) {
@ -343,7 +343,6 @@ main(int argc, char *argv[])
fprintf(stderr, " | Executing SMR test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_write_begin(&epoch_wr);
if (table_reset() == false) {
fprintf(stderr, "ERROR: Failed to reset hash table.\n");
exit(EXIT_FAILURE);
@ -354,27 +353,23 @@ main(int argc, char *argv[])
d += table_insert(keys[i]) == false;
e = rdtsc();
a += e - s;
ck_epoch_write_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
fprintf(stderr, " | Executing replacement test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_write_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++)
table_replace(keys[i]);
e = rdtsc();
a += e - s;
ck_epoch_write_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
fprintf(stderr, " | Executing get test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_read_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
if (table_get(keys[i]) == 0) {
@ -384,14 +379,12 @@ main(int argc, char *argv[])
}
e = rdtsc();
a += e - s;
ck_epoch_read_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
a = 0;
fprintf(stderr, " | Executing removal test...");
for (j = 0; j < r; j++) {
ck_epoch_write_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++)
table_remove(keys[i]);
@ -400,31 +393,28 @@ main(int argc, char *argv[])
for (i = 0; i < keys_length; i++)
table_insert(keys[i]);
ck_epoch_write_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
fprintf(stderr, " | Executing negative look-up test...");
a = 0;
for (j = 0; j < r; j++) {
ck_epoch_read_begin(&epoch_wr);
s = rdtsc();
for (i = 0; i < keys_length; i++) {
table_get(2);
}
e = rdtsc();
a += e - s;
ck_epoch_read_end(&epoch_wr);
}
fprintf(stderr, "done (%" PRIu64 " ticks)\n", a / (r * keys_length));
ck_epoch_record_t epoch_temporary = epoch_wr;
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> "
"%u pending, %u peak, %" PRIu64 " reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations);
fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> "
"%u pending, %u peak, %lu reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch);
fprintf(stderr, " ,- READER CONCURRENCY\n");
fprintf(stderr, " | Executing reader test...");
@ -465,7 +455,7 @@ main(int argc, char *argv[])
while (ck_pr_load_int(&barrier[HT_STATE_STRICT_REPLACEMENT]) != n_threads)
ck_pr_stall();
table_reset();
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n",
a / (repeated * keys_length), accumulator[HT_STATE_STRICT_REPLACEMENT] / n_threads);
@ -501,7 +491,7 @@ main(int argc, char *argv[])
ck_pr_stall();
table_reset();
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n",
a / (repeated * keys_length), accumulator[HT_STATE_DELETION] / n_threads);
@ -541,18 +531,18 @@ main(int argc, char *argv[])
while (ck_pr_load_int(&barrier[HT_STATE_REPLACEMENT]) != n_threads)
ck_pr_stall();
table_reset();
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, "done (writer = %" PRIu64 " ticks, reader = %" PRIu64 " ticks)\n",
a / (repeated * keys_length), accumulator[HT_STATE_REPLACEMENT] / n_threads);
ck_pr_inc_int(&barrier[HT_STATE_REPLACEMENT]);
epoch_temporary = epoch_wr;
ck_epoch_purge(&epoch_wr);
ck_epoch_synchronize(&epoch_ht, &epoch_wr);
fprintf(stderr, " '- Summary: %u pending, %u peak, %" PRIu64 " reclamations -> "
"%u pending, %u peak, %" PRIu64 " reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_reclamations,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_reclamations);
fprintf(stderr, " '- Summary: %u pending, %u peak, %lu reclamations -> "
"%u pending, %u peak, %lu reclamations\n\n",
epoch_temporary.n_pending, epoch_temporary.n_peak, epoch_temporary.n_dispatch,
epoch_wr.n_pending, epoch_wr.n_peak, epoch_wr.n_dispatch);
return 0;
}
#else

@ -37,22 +37,114 @@
#include <ck_stack.h>
#include <stdbool.h>
/*
* Only three distinct epoch values are needed. If any thread is in
* a "critical section" then it would have acquired some snapshot (e)
* of the global epoch value (e_g) and set an active flag. Any hazardous
* references will only occur after a full memory barrier. For example,
* assume an initial e_g value of 1, e value of 0 and active value of 0.
*
* ck_epoch_begin(...)
* e = e_g
* active = 1
* memory_barrier();
*
* Any serialized reads may observe e = 0 or e = 1 with active = 0, or
* e = 0 or e = 1 with active = 1. The e_g value can only go from 1
* to 2 if every thread has already observed the value of "1" (or the
* value we are incrementing from). This guarantees us that for any
* given value e_g, any threads with-in critical sections (referred
* to as "active" threads from here on) would have an e value of
* e_g - 1 or e_g. This also means that hazardous references may be
* shared in both e_g - 1 and e_g even if they are logically deleted
* in e_g.
*
* For example, assume all threads have an e value of e_g. Another
* thread may increment to e_g to e_g + 1. Older threads may have
* a reference to an object which is only deleted in e_g + 1. It
* could be that reader threads are executing some hash table look-ups,
* while some other writer thread (which causes epoch counter tick)
* actually deletes the same items that reader threads are looking
* up (this writer thread having an e value of e_g + 1). This is possible
* if the writer thread re-observes the epoch after the counter tick.
*
* Psuedo-code for writer:
* ck_epoch_begin()
* ht_delete(x)
* ck_epoch_end()
* ck_epoch_begin()
* ht_delete(x)
* ck_epoch_end()
*
* Psuedo-code for reader:
* for (;;) {
* x = ht_lookup(x)
* ck_pr_inc(&x->value);
* }
*
* Of course, it is also possible for references logically deleted
* at e_g - 1 to still be accessed at e_g as threads are "active"
* at the same time (real-world time) mutating shared objects.
*
* Now, if the epoch counter is ticked to e_g + 1, then no new
* hazardous references could exist to objects logically deleted at
* e_g - 1. The reason for this is that at e_g + 1, all epoch read-side
* critical sections started at e_g - 1 must have been completed. If
* any epoch read-side critical sections at e_g - 1 were still active,
* then we would never increment to e_g + 1 (active != 0 ^ e != e_g).
* Additionally, e_g may still have hazardous references to objects logically
* deleted at e_g - 1 which means objects logically deleted at e_g - 1 cannot
* be deleted at e_g + 1 (since it is valid for active threads to be at e_g or
* e_g + 1 and threads at e_g still require safe memory accesses).
*
* However, at e_g + 2, all active threads must be either at e_g + 1 or
* e_g + 2. Though e_g + 2 may share hazardous references with e_g + 1,
* and e_g + 1 shares hazardous references to e_g, no active threads are
* at e_g or e_g - 1. This means no hazardous references could exist to
* objects deleted at e_g - 1 (at e_g + 2).
*
* To summarize these important points,
* 1) Active threads will always have a value of e_g or e_g - 1.
* 2) Items that are logically deleted at e_g or e_g - 1 cannot be
* physically deleted.
* 3) Objects logically deleted at e_g - 1 can be physically destroyed
* at e_g + 2. In other words, for any current value of the global epoch
* counter e_g, objects logically deleted at e_g can be physically
* deleted at e_g + 3.
*
* Last but not least, if we are at e_g + 2, then no active thread is at
* e_g which means it is safe to apply modulo-3 arithmetic to e_g value
* in order to re-use e_g to represent the e_g + 3 state. This means it is
* sufficient to represent e_g using only the values 0, 1 or 2. Every time
* a thread re-visits a e_g (which can be determined with a non-empty deferral
* list) it can assume objects in the e_g deferral list involved at least
* three e_g transitions and are thus, safe, for physical deletion.
*
* Blocking semantics for epoch reclamation have additional restrictions.
* Though we only require three deferral lists, reasonable blocking semantics
* must be able to more gracefully handle bursty write work-loads which could
* easily cause e_g wrap-around if modulo-3 arithmetic is used. This allows for
* easy-to-trigger live-lock situations. The work-around to work around
* this is to not apply modulo arithmetic to e_g but only to deferral list
* indexing.
*/
#define CK_EPOCH_GRACE 3U
enum {
CK_EPOCH_USED = 0,
CK_EPOCH_FREE = 1
CK_EPOCH_STATE_USED = 0,
CK_EPOCH_STATE_FREE = 1
};
CK_STACK_CONTAINER(struct ck_epoch_record, record_next, ck_epoch_record_container)
CK_STACK_CONTAINER(struct ck_epoch_entry, stack_entry, ck_epoch_entry_container)
void
ck_epoch_init(struct ck_epoch *global, unsigned int threshold)
ck_epoch_init(struct ck_epoch *global)
{
ck_stack_init(&global->records);
global->epoch = 1;
global->n_free = 0;
global->threshold = threshold;
ck_pr_fence_store();
return;
}
@ -62,7 +154,7 @@ ck_epoch_recycle(struct ck_epoch *global)
{
struct ck_epoch_record *record;
ck_stack_entry_t *cursor;
unsigned int status;
unsigned int state;
if (ck_pr_load_uint(&global->n_free) == 0)
return (NULL);
@ -70,10 +162,10 @@ ck_epoch_recycle(struct ck_epoch *global)
CK_STACK_FOREACH(&global->records, cursor) {
record = ck_epoch_record_container(cursor);
if (ck_pr_load_uint(&record->status) == CK_EPOCH_FREE) {
if (ck_pr_load_uint(&record->state) == CK_EPOCH_STATE_FREE) {
ck_pr_fence_load();
status = ck_pr_fas_uint(&record->status, CK_EPOCH_USED);
if (status == CK_EPOCH_FREE) {
state = ck_pr_fas_uint(&record->state, CK_EPOCH_STATE_USED);
if (state == CK_EPOCH_STATE_FREE) {
ck_pr_dec_uint(&global->n_free);
return record;
}
@ -88,14 +180,12 @@ ck_epoch_register(struct ck_epoch *global, struct ck_epoch_record *record)
{
size_t i;
record->status = CK_EPOCH_USED;
record->state = CK_EPOCH_STATE_USED;
record->active = 0;
record->epoch = 0;
record->delta = 0;
record->n_pending = 0;
record->n_dispatch = 0;
record->n_peak = 0;
record->n_reclamations = 0;
record->global = global;
record->n_pending = 0;
for (i = 0; i < CK_EPOCH_LENGTH; i++)
ck_stack_init(&record->pending[i]);
@ -112,151 +202,148 @@ ck_epoch_unregister(struct ck_epoch_record *record)
record->active = 0;
record->epoch = 0;
record->delta = 0;
record->n_pending = 0;
record->n_dispatch = 0;
record->n_peak = 0;
record->n_reclamations = 0;
record->n_pending = 0;
for (i = 0; i < CK_EPOCH_LENGTH; i++)
ck_stack_init(&record->pending[i]);
ck_pr_fence_store();
ck_pr_store_uint(&record->status, CK_EPOCH_FREE);
ck_pr_inc_uint(&record->global->n_free);
ck_pr_store_uint(&record->state, CK_EPOCH_STATE_FREE);
return;
}
void
ck_epoch_tick(struct ck_epoch *global, struct ck_epoch_record *record)
static struct ck_epoch_record *
ck_epoch_scan(struct ck_epoch *global, struct ck_epoch_record *cr, unsigned int epoch)
{
struct ck_epoch_record *c_record;
ck_stack_entry_t *cursor;
unsigned int g_epoch = ck_pr_load_uint(&global->epoch);
CK_STACK_FOREACH(&global->records, cursor) {
c_record = ck_epoch_record_container(cursor);
if (ck_pr_load_uint(&c_record->status) == CK_EPOCH_FREE ||
c_record == record)
if (cr == NULL) {
cursor = CK_STACK_FIRST(&global->records);
} else {
cursor = &cr->record_next;
}
while (cursor != NULL) {
unsigned int state;
cr = ck_epoch_record_container(cursor);
state = ck_pr_load_uint(&cr->state);
if (state & CK_EPOCH_STATE_FREE)
continue;
if (ck_pr_load_uint(&c_record->active) != 0 &&
ck_pr_load_uint(&c_record->epoch) != g_epoch)
return;
if (ck_pr_load_uint(&cr->active) != 0 &&
ck_pr_load_uint(&cr->epoch) != epoch)
return cr;
cursor = CK_STACK_NEXT(cursor);
}
/*
* If we have multiple writers, it is much easier to starve
* reclamation if we loop through the epoch domain. It may
* be worth it to add an SPMC variant to ck_epoch that relies
* on atomic increment operations instead.
*/
ck_pr_cas_uint(&global->epoch, g_epoch, (g_epoch + 1) & (CK_EPOCH_LENGTH - 1));
return;
return NULL;
}
bool
ck_epoch_reclaim(struct ck_epoch_record *record)
static void
ck_epoch_dispatch(struct ck_epoch_record *record, unsigned int e)
{
struct ck_epoch *global = record->global;
unsigned int g_epoch = ck_pr_load_uint(&global->epoch);
unsigned int epoch = record->epoch;
unsigned int epoch = e & (CK_EPOCH_LENGTH - 1);
ck_stack_entry_t *next, *cursor;
unsigned int i = 0;
if (epoch == g_epoch)
return false;
/*
* This means all threads with a potential reference to a
* hazard pointer will have a view as new as or newer than
* the calling thread. No active reference should exist to
* any object in the record's pending list.
*/
CK_STACK_FOREACH_SAFE(&record->pending[g_epoch], cursor, next) {
CK_STACK_FOREACH_SAFE(&record->pending[epoch], cursor, next) {
struct ck_epoch_entry *entry = ck_epoch_entry_container(cursor);
entry->destroy(entry);
record->n_pending--;
record->n_reclamations++;
entry->function(entry);
i++;
}
ck_stack_init(&record->pending[g_epoch]);
ck_pr_store_uint(&record->epoch, g_epoch);
record->delta = 0;
return true;
if (record->n_pending > record->n_peak)
record->n_peak = record->n_pending;
record->n_dispatch += i;
record->n_pending -= i;
ck_stack_init(&record->pending[epoch]);
return;
}
/*
* This function must not be called with-in read section.
*/
void
ck_epoch_write_begin(struct ck_epoch_record *record)
ck_epoch_barrier(struct ck_epoch *global, struct ck_epoch_record *record)
{
struct ck_epoch *global = record->global;
ck_pr_store_uint(&record->active, record->active + 1);
struct ck_epoch_record *cr;
unsigned int delta, epoch, goal, i;
/*
* In the case of recursive write sections, avoid ticking
* over global epoch.
* Guarantee any mutations previous to the barrier will be made visible
* with respect to epoch snapshots we will read.
*/
if (record->active > 1)
return;
ck_pr_fence_memory();
for (;;) {
/*
* Reclaim deferred objects if possible and
* acquire a new snapshot of the global epoch.
*/
if (ck_epoch_reclaim(record) == true)
break;
delta = epoch = ck_pr_load_uint(&global->epoch);
goal = epoch + CK_EPOCH_GRACE;
for (i = 0, cr = NULL; i < CK_EPOCH_GRACE; cr = NULL, i++) {
/* Determine whether all threads have observed the current epoch. */
while (cr = ck_epoch_scan(global, cr, delta), cr != NULL)
ck_pr_stall();
/*
* If we are above the global epoch record threshold,
* attempt to tick over the global epoch counter.
* Increment current epoch. CAS semantics are used to eliminate
* increment operations for synchronization that occurs for the
* same global epoch value snapshot.
*
* If we can guarantee there will only be one active barrier
* or epoch tick at a given time, then it is sufficient to
* use an increment operation. In a multi-barrier workload,
* however, it is possible to overflow the epoch value if we
* apply modulo-3 arithmetic.
*/
if (++record->delta >= global->threshold) {
record->delta = 0;
ck_epoch_tick(global, record);
continue;
}
ck_pr_cas_uint_value(&global->epoch, delta, delta + 1, &delta);
break;
/* Right now, epoch overflow is handled as an edge case. */
if ((goal > epoch) & (delta > goal))
break;
}
/*
* As the synchronize operation is non-blocking, it is possible other
* writers have already observed three or more epoch generations
* relative to the generation the caller has observed. In this case,
* it is safe to assume we are also in a grace period and are able to
* dispatch all calls across all lists.
*/
for (epoch = 0; epoch < CK_EPOCH_LENGTH; epoch++)
ck_epoch_dispatch(record, epoch);
record->epoch = delta;
return;
}
void
ck_epoch_free(struct ck_epoch_record *record,
ck_epoch_entry_t *entry,
ck_epoch_destructor_t destroy)
ck_epoch_synchronize(struct ck_epoch *global, struct ck_epoch_record *record)
{
unsigned int epoch = ck_pr_load_uint(&record->epoch);
struct ck_epoch *global = record->global;
entry->destroy = destroy;
ck_stack_push_spnc(&record->pending[epoch], &entry->stack_entry);
record->n_pending += 1;
if (record->n_pending > record->n_peak)
record->n_peak = record->n_pending;
if (record->n_pending >= global->threshold && ck_epoch_reclaim(record) == false)
ck_epoch_tick(global, record);
ck_epoch_barrier(global, record);
return;
}
void
ck_epoch_purge(struct ck_epoch_record *record)
bool
ck_epoch_poll(struct ck_epoch *global, struct ck_epoch_record *record)
{
ck_backoff_t backoff = CK_BACKOFF_INITIALIZER;
unsigned int epoch = ck_pr_load_uint(&global->epoch);
unsigned int snapshot;
struct ck_epoch_record *cr = NULL;
while (record->n_pending > 0) {
ck_epoch_reclaim(record);
ck_epoch_tick(record->global, record);
if (record->n_pending > 0)
ck_backoff_gb(&backoff);
}
cr = ck_epoch_scan(global, cr, epoch);
if (cr != NULL)
return false;
return;
ck_pr_cas_uint_value(&global->epoch, epoch, epoch + 1, &snapshot);
ck_epoch_dispatch(record, epoch + 1);
record->epoch = snapshot;
return true;
}

Loading…
Cancel
Save