ck_rhs: Add a read mostly mode.

Add a read-mostly mode, in which entries won't share cache lines with
associated datas (probes, probe_bound, etc).
This makes write operations slower, but make get faster.
ck_pring
User Doginou 11 years ago
parent 0f908f1e31
commit 59cedf10c6

@ -55,6 +55,12 @@
*/
#define CK_RHS_MODE_OBJECT 8
/*
* Indicated that the load is read-mostly, so get should be optimized
* over put and delete
*/
#define CK_RHS_MODE_READ_MOSTLY 16
/* Currently unsupported. */
#define CK_RHS_MODE_MPMC (void)

@ -507,6 +507,7 @@ main(int argc, char *argv[])
global_seed = common_lrand48();
run_test(argv[1], r, size, 0);
run_test(argv[1], r, size, CK_RHS_MODE_READ_MOSTLY);
fprintf(stderr, "# reverse_insertion serial_insertion random_insertion serial_swap "
"serial_replace reverse_get serial_get random_get serial_remove negative_get tombstone "
"set_unique gc rebuild\n\n");

@ -84,15 +84,31 @@ enum ck_rhs_probe_behavior {
CK_RHS_PROBE_ROBIN_HOOD, /* Look for the first slot available for the entry we are about to replace, only used to internally implement Robin Hood */
CK_RHS_PROBE_NO_RH, /* Don't do the RH dance */
};
struct ck_rhs_entry_desc {
void *entry;
unsigned int probes;
unsigned short wanted;
CK_RHS_WORD probe_bound;
bool in_rh;
void *entry;
} __attribute__ ((__aligned__(16)));
struct ck_rhs_no_entry_desc {
unsigned int probes;
unsigned short wanted;
CK_RHS_WORD probe_bound;
bool in_rh;
} __attribute__ ((__aligned__(8)));
typedef long (*probe_func) (struct ck_rhs *hs,
struct ck_rhs_map *map,
unsigned long *n_probes,
long *priority,
unsigned long h,
const void *key,
void **object,
unsigned long probe_limit,
enum ck_rhs_probe_behavior behavior);
struct ck_rhs_map {
unsigned int generation[CK_RHS_G];
unsigned int probe_maximum;
@ -103,9 +119,88 @@ struct ck_rhs_map {
unsigned long capacity;
unsigned long size;
char offset_mask;
struct ck_rhs_entry_desc *descs;
union {
struct ck_rhs_entry_desc *descs;
struct ck_rhs_no_entry {
void **entries;
struct ck_rhs_no_entry_desc *descs;
} no_entries;
} entries;
bool read_mostly;
probe_func probe_func;
};
#define CK_RHS_ENTRY(map, offset) (map->read_mostly ? map->entries.no_entries.entries[offset] : map->entries.descs[offset].entry)
#define CK_RHS_ENTRY_ADDR(map, offset) (map->read_mostly ? &map->entries.no_entries.entries[offset] : &map->entries.descs[offset].entry)
#define CK_RHS_DESC(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? (struct ck_rhs_entry_desc *)(void *)&map->entries.no_entries.descs[offset] : &map->entries.descs[offset])
#define CK_RHS_WANTED_INC(map, offset) do { \
if (CK_CC_UNLIKELY(map->read_mostly)) \
map->entries.no_entries.descs[offset].wanted++; \
else \
map->entries.descs[offset].wanted++; \
} while (0)
#define CK_RHS_WANTED(map, offset) (CK_CC_UNLIKELY(map->read_mostly )? map->entries.no_entries.descs[offset].wanted : map->entries.descs[offset].wanted)
#define CK_RHS_SET_WANTED(map, offset, value) do { \
if (CK_CC_UNLIKELY(map->read_mostly)) \
map->entries.no_entries.descs[offset].wanted = value; \
else \
map->entries.descs[offset].probes = value; \
} while (0)
#define CK_RHS_WANTED_DEC(map, offset) do { \
if (CK_CC_UNLIKELY(map->read_mostly)) \
map->entries.no_entries.descs[offset].wanted--; \
else \
map->entries.descs[offset].wanted--; \
} while (0)
#define CK_RHS_PROBES(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? map->entries.no_entries.descs[offset].probes : map->entries.descs[offset].probes)
#define CK_RHS_SET_PROBES(map, offset, value) do { \
if (CK_CC_UNLIKELY(map->read_mostly)) \
map->entries.no_entries.descs[offset].probes = value; \
else \
map->entries.descs[offset].probes = value; \
} while (0)
#define CK_RHS_PROBE_BOUND(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? map->entries.no_entries.descs[offset].probe_bound : map->entries.descs[offset].probe_bound)
#define CK_RHS_PROBE_BOUND_ADDR(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? (&map->entries.no_entries.descs[offset].probe_bound) : (&map->entries.descs[offset].probe_bound))
#define CK_RHS_IN_RH(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? map->entries.no_entries.descs[offset].in_rh : map->entries.descs[offset].in_rh)
#define CK_RHS_SET_RH(map, offset) do { \
if (CK_CC_UNLIKELY(map->read_mostly)) \
map->entries.no_entries.descs[offset].in_rh = true; \
else \
map->entries.descs[offset].in_rh = true; \
} while (0)
#define CK_RHS_UNSET_RH(map, offset) do { \
if (CK_CC_UNLIKELY(map->read_mostly)) \
map->entries.no_entries.descs[offset].in_rh = false; \
else \
map->entries.descs[offset].in_rh = false; \
} while (0)
#define CK_RHS_LOAD_FACTOR 50
static long
ck_rhs_map_probe(struct ck_rhs *hs,
struct ck_rhs_map *map,
unsigned long *n_probes,
long *priority,
unsigned long h,
const void *key,
void **object,
unsigned long probe_limit,
enum ck_rhs_probe_behavior behavior);
static long
ck_rhs_map_probe_rm(struct ck_rhs *hs,
struct ck_rhs_map *map,
unsigned long *n_probes,
long *priority,
unsigned long h,
const void *key,
void **object,
unsigned long probe_limit,
enum ck_rhs_probe_behavior behavior);
void
ck_rhs_iterator_init(struct ck_rhs_iterator *iterator)
{
@ -125,7 +220,7 @@ ck_rhs_next(struct ck_rhs *hs, struct ck_rhs_iterator *i, void **key)
return false;
do {
value = map->descs[i->offset].entry;
value = CK_RHS_ENTRY(map, i->offset);
if (value != CK_RHS_EMPTY) {
#ifdef CK_RHS_PP
if (hs->mode & CK_RHS_MODE_OBJECT)
@ -183,15 +278,18 @@ ck_rhs_map_create(struct ck_rhs *hs, unsigned long entries)
if (n_entries < CK_RHS_PROBE_L1)
return NULL;
size = sizeof(struct ck_rhs_map) +
(sizeof(struct ck_rhs_entry_desc) * n_entries + CK_MD_CACHELINE - 1);
if (hs->mode & CK_RHS_MODE_READ_MOSTLY)
size = sizeof(struct ck_rhs_map) +
(sizeof(void *) * n_entries + sizeof(struct ck_rhs_no_entry_desc) * n_entries + 2 * CK_MD_CACHELINE - 1);
else
size = sizeof(struct ck_rhs_map) +
(sizeof(struct ck_rhs_entry_desc) * n_entries + CK_MD_CACHELINE - 1);
map = hs->m->malloc(size);
if (map == NULL)
return NULL;
map->read_mostly = !!(hs->mode & CK_RHS_MODE_READ_MOSTLY);
map->size = size;
map->offset_mask = (CK_MD_CACHELINE / sizeof(struct ck_rhs_entry_desc)) - 1;
/* We should probably use a more intelligent heuristic for default probe length. */
limit = ck_internal_max(n_entries >> (CK_RHS_PROBE_L1_SHIFT + 2), CK_RHS_PROBE_L1_DEFAULT);
if (limit > UINT_MAX)
@ -205,10 +303,25 @@ ck_rhs_map_create(struct ck_rhs *hs, unsigned long entries)
map->n_entries = 0;
/* Align map allocation to cache line. */
map->descs = (void *)(((uintptr_t)&map[1] +
CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1));
if (map->read_mostly) {
map->entries.no_entries.entries = (void *)(((uintptr_t)&map[1] +
CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1));
map->entries.no_entries.descs = (void *)(((uintptr_t)map->entries.no_entries.entries + (sizeof(void *) * n_entries) + CK_MD_CACHELINE - 1) &~ (CK_MD_CACHELINE - 1));
memset(map->entries.no_entries.entries, 0,
sizeof(void *) * n_entries);
memset(map->entries.no_entries.descs, 0,
sizeof(struct ck_rhs_no_entry_desc));
map->offset_mask = (CK_MD_CACHELINE / sizeof(void *)) - 1;
map->probe_func = ck_rhs_map_probe_rm;
} else {
map->entries.descs = (void *)(((uintptr_t)&map[1] +
CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1));
memset(map->entries.descs, 0, sizeof(struct ck_rhs_entry_desc) * n_entries);
map->offset_mask = (CK_MD_CACHELINE / sizeof(struct ck_rhs_entry_desc)) - 1;
map->probe_func = ck_rhs_map_probe;
}
memset(map->generation, 0, sizeof map->generation);
memset(map->descs, 0, sizeof(*map->descs) * n_entries);
/* Commit entries purge with respect to map publication. */
ck_pr_fence_store();
@ -270,16 +383,20 @@ ck_rhs_map_bound_set(struct ck_rhs_map *m,
unsigned long n_probes)
{
unsigned long offset = h & m->mask;
struct ck_rhs_entry_desc *desc;
if (n_probes > m->probe_maximum)
ck_pr_store_uint(&m->probe_maximum, n_probes);
if (!(m->read_mostly)) {
desc = &m->entries.descs[offset];
if (m->descs[offset].probe_bound < n_probes) {
if (n_probes > CK_RHS_WORD_MAX)
n_probes = CK_RHS_WORD_MAX;
if (desc->probe_bound < n_probes) {
if (n_probes > CK_RHS_WORD_MAX)
n_probes = CK_RHS_WORD_MAX;
CK_RHS_STORE(&m->descs[offset].probe_bound, n_probes);
ck_pr_fence_store();
CK_RHS_STORE(&desc->probe_bound, n_probes);
ck_pr_fence_store();
}
}
return;
@ -291,9 +408,13 @@ ck_rhs_map_bound_get(struct ck_rhs_map *m, unsigned long h)
unsigned long offset = h & m->mask;
unsigned int r = CK_RHS_WORD_MAX;
r = CK_RHS_LOAD(&m->descs[offset].probe_bound);
if (r == CK_RHS_WORD_MAX)
if (m->read_mostly)
r = ck_pr_load_uint(&m->probe_maximum);
else {
r = CK_RHS_LOAD(&m->entries.descs[offset].probe_bound);
if (r == CK_RHS_WORD_MAX)
r = ck_pr_load_uint(&m->probe_maximum);
}
return r;
}
@ -317,7 +438,7 @@ restart:
for (k = 0; k < map->capacity; k++) {
unsigned long h;
prev_saved = previous = map->descs[k].entry;
prev_saved = previous = CK_RHS_ENTRY(map, k);
if (previous == CK_RHS_EMPTY)
continue;
@ -331,7 +452,7 @@ restart:
probes = 0;
for (;;) {
void **cursor = &update->descs[offset].entry;
void **cursor = CK_RHS_ENTRY_ADDR(update, offset);
if (probes++ == update->probe_limit) {
/*
@ -345,10 +466,10 @@ restart:
if (CK_CC_LIKELY(*cursor == CK_RHS_EMPTY)) {
*cursor = prev_saved;
update->n_entries++;
update->descs[offset].probes = probes;
CK_RHS_SET_PROBES(update, offset, probes);
ck_rhs_map_bound_set(update, h, probes);
break;
} else if (update->descs[offset].probes < probes) {
} else if (CK_RHS_PROBES(update, offset) < probes) {
void *tmp = prev_saved;
unsigned int old_probes;
prev_saved = previous = *cursor;
@ -359,12 +480,12 @@ restart:
*cursor = tmp;
ck_rhs_map_bound_set(update, h, probes);
h = hs->hf(previous, hs->seed);
old_probes = update->descs[offset].probes;
update->descs[offset].probes = probes;
old_probes = CK_RHS_PROBES(update, offset);
CK_RHS_SET_PROBES(update, offset, probes);
probes = old_probes - 1;
continue;
}
update->descs[offset].wanted++;
}
CK_RHS_WANTED_INC(update, offset);
offset = ck_rhs_map_probe_next(update, offset, probes);
}
@ -384,11 +505,129 @@ ck_rhs_rebuild(struct ck_rhs *hs)
return ck_rhs_grow(hs, hs->map->capacity);
}
static struct ck_rhs_entry_desc *
static long
ck_rhs_map_probe_rm(struct ck_rhs *hs,
struct ck_rhs_map *map,
unsigned long *n_probes,
long *priority,
unsigned long h,
const void *key,
void **object,
unsigned long probe_limit,
enum ck_rhs_probe_behavior behavior)
{
void *k;
const void *compare;
long pr = -1;
unsigned long offset, probes, opl;
#ifdef CK_RHS_PP
/* If we are storing object pointers, then we may leverage pointer packing. */
unsigned long hv = 0;
if (hs->mode & CK_RHS_MODE_OBJECT) {
hv = (h >> 25) & CK_RHS_KEY_MASK;
compare = CK_RHS_VMA(key);
} else {
compare = key;
}
#else
compare = key;
#endif
*object = NULL;
if (behavior != CK_RHS_PROBE_ROBIN_HOOD) {
probes = 0;
offset = h & map->mask;
} else {
/* Restart from the bucket we were previously in */
probes = *n_probes;
offset = ck_rhs_map_probe_next(map, *priority,
probes);
}
opl = probe_limit;
for (;;) {
if (probes++ == probe_limit) {
if (probe_limit == opl || pr != -1) {
k = CK_RHS_EMPTY;
goto leave;
}
/*
* If no eligible slot has been found yet, continue probe
* sequence with original probe limit.
*/
probe_limit = opl;
}
k = ck_pr_load_ptr(&map->entries.no_entries.entries[offset]);
if (k == CK_RHS_EMPTY)
goto leave;
if ((behavior != CK_RHS_PROBE_NO_RH)) {
struct ck_rhs_entry_desc *desc = (void *)&map->entries.no_entries.descs[offset];
if (pr == -1 &&
desc->in_rh == false && desc->probes < probes) {
pr = offset;
*n_probes = probes;
if (behavior == CK_RHS_PROBE_RH ||
behavior == CK_RHS_PROBE_ROBIN_HOOD) {
k = CK_RHS_EMPTY;
goto leave;
}
}
}
if (behavior != CK_RHS_PROBE_ROBIN_HOOD) {
#ifdef CK_RHS_PP
if (hs->mode & CK_RHS_MODE_OBJECT) {
if (((uintptr_t)k >> CK_MD_VMA_BITS) != hv) {
offset = ck_rhs_map_probe_next(map, offset, probes);
continue;
}
k = CK_RHS_VMA(k);
}
#endif
if (k == compare)
goto leave;
if (hs->compare == NULL) {
offset = ck_rhs_map_probe_next(map, offset, probes);
continue;
}
if (hs->compare(k, key) == true)
goto leave;
}
offset = ck_rhs_map_probe_next(map, offset, probes);
}
leave:
if (probes > probe_limit) {
offset = -1;
} else {
*object = k;
}
if (pr == -1)
*n_probes = probes;
*priority = pr;
return offset;
}
static long
ck_rhs_map_probe(struct ck_rhs *hs,
struct ck_rhs_map *map,
unsigned long *n_probes,
struct ck_rhs_entry_desc **priority,
long *priority,
unsigned long h,
const void *key,
void **object,
@ -398,7 +637,7 @@ ck_rhs_map_probe(struct ck_rhs *hs,
void *k;
const void *compare;
struct ck_rhs_entry_desc *cursor, *pr = NULL;
long pr = -1;
unsigned long offset, probes, opl;
#ifdef CK_RHS_PP
@ -422,7 +661,7 @@ ck_rhs_map_probe(struct ck_rhs *hs,
} else {
/* Restart from the bucket we were previously in */
probes = *n_probes;
offset = ck_rhs_map_probe_next(map, *priority - map->descs,
offset = ck_rhs_map_probe_next(map, *priority,
probes);
}
opl = probe_limit;
@ -431,9 +670,8 @@ ck_rhs_map_probe(struct ck_rhs *hs,
for (;;) {
cursor = &map->descs[offset];
if (probes++ == probe_limit) {
if (probe_limit == opl || pr != NULL) {
if (probe_limit == opl || pr != -1) {
k = CK_RHS_EMPTY;
goto leave;
}
@ -444,26 +682,24 @@ ck_rhs_map_probe(struct ck_rhs *hs,
probe_limit = opl;
}
k = ck_pr_load_ptr(&cursor->entry);
k = ck_pr_load_ptr(&map->entries.descs[offset].entry);
if (k == CK_RHS_EMPTY)
goto leave;
if ((behavior != CK_RHS_PROBE_NO_RH) &&
(map->descs[offset].in_rh == false) &&
map->descs[offset].probes <
probes) {
if (pr == NULL) {
pr = &map->descs[offset];
if ((behavior != CK_RHS_PROBE_NO_RH)) {
struct ck_rhs_entry_desc *desc = &map->entries.descs[offset];;
if (pr == -1 &&
desc->in_rh == false && desc->probes < probes) {
pr = offset;
*n_probes = probes;
if (behavior == CK_RHS_PROBE_RH ||
behavior == CK_RHS_PROBE_ROBIN_HOOD) {
behavior == CK_RHS_PROBE_ROBIN_HOOD) {
k = CK_RHS_EMPTY;
goto leave;
}
}
offset = ck_rhs_map_probe_next(map, offset, probes);
continue;
}
@ -495,16 +731,16 @@ ck_rhs_map_probe(struct ck_rhs *hs,
leave:
if (probes > probe_limit) {
cursor = NULL;
offset = -1;
} else {
*object = k;
}
if (pr == NULL)
if (pr == -1)
*n_probes = probes;
*priority = pr;
return cursor;
return offset;
}
static inline void *
@ -535,53 +771,58 @@ ck_rhs_gc(struct ck_rhs *hs)
unsigned int max_probes = 0;
for (i = 0; i < map->capacity; i++) {
if (map->descs[i].probes > max_probes)
max_probes = map->descs[i].probes;
if (CK_RHS_PROBES(map, i) > max_probes)
max_probes = CK_RHS_PROBES(map, i);
}
map->probe_maximum = max_probes;
return true;
}
static void
ck_rhs_add_wanted(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot, struct ck_rhs_entry_desc *old_slot,
ck_rhs_add_wanted(struct ck_rhs *hs, long end_offset, long old_slot,
unsigned long h)
{
struct ck_rhs_map *map = hs->map;
long offset, end_offset;
long offset;
unsigned int probes = 1;
bool found_slot = false;
struct ck_rhs_entry_desc *desc;
offset = h & map->mask;
end_offset = slot - map->descs;
if (old_slot == NULL)
if (old_slot == -1)
found_slot = true;
while (offset != end_offset) {
if (offset == old_slot - map->descs)
if (offset == old_slot)
found_slot = true;
if (found_slot && map->descs[offset].wanted < CK_RHS_MAX_WANTED)
map->descs[offset].wanted++;
if (found_slot) {
desc = CK_RHS_DESC(map, offset);
if (desc->wanted < CK_RHS_MAX_WANTED)
desc->wanted++;
}
offset = ck_rhs_map_probe_next(map, offset, probes);
probes++;
}
}
static unsigned long
ck_rhs_remove_wanted(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot, long limit)
ck_rhs_remove_wanted(struct ck_rhs *hs, long offset, long limit)
{
struct ck_rhs_map *map = hs->map;
int probes = slot->probes;
long offset = slot - map->descs;
int probes = CK_RHS_PROBES(map, offset);
bool do_remove = true;
struct ck_rhs_entry_desc *desc;
while (probes > 1) {
probes--;
offset = ck_rhs_map_probe_prev(map, offset, probes);
if (offset == limit)
do_remove = false;
if (map->descs[offset].wanted != CK_RHS_MAX_WANTED && do_remove)
map->descs[offset].wanted--;
if (do_remove) {
desc = CK_RHS_DESC(map, offset);
if (desc->wanted != CK_RHS_MAX_WANTED)
desc->wanted--;
}
}
return offset;
}
@ -589,7 +830,7 @@ ck_rhs_remove_wanted(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot, long lim
static long
ck_rhs_get_first_offset(struct ck_rhs_map *map, unsigned long offset, unsigned int probes)
{
while (probes > 4) {
while (probes > (unsigned long)map->offset_mask + 1) {
offset -= ((probes - 1) &~ map->offset_mask);
offset &= map->mask;
offset = (offset &~ map->offset_mask) +
@ -603,11 +844,11 @@ ck_rhs_get_first_offset(struct ck_rhs_map *map, unsigned long offset, unsigned i
static int
ck_rhs_put_robin_hood(struct ck_rhs *hs,
struct ck_rhs_entry_desc *orig_slot)
long orig_slot, struct ck_rhs_entry_desc *desc)
{
struct ck_rhs_entry_desc *slot, *first;
long slot, first;
void *object, *insert;
unsigned long orig_probes, n_probes;
unsigned long n_probes;
struct ck_rhs_map *map;
unsigned long h = 0;
long prev;
@ -617,80 +858,83 @@ ck_rhs_put_robin_hood(struct ck_rhs *hs,
map = hs->map;
first = orig_slot;
n_probes = orig_slot->probes;
n_probes = desc->probes;
restart:
orig_probes = n_probes;
key = first->entry;
key = CK_RHS_ENTRY(map, first);
insert = key;
#ifdef CK_RHS_PP
if (hs->mode & CK_RHS_MODE_OBJECT)
key = CK_RHS_VMA(key);
#endif
orig_slot = first;
orig_slot->in_rh = true;
CK_RHS_SET_RH(map, orig_slot);
slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object,
slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object,
map->probe_limit, prevs_nb == CK_RHS_MAX_RH ?
CK_RHS_PROBE_NO_RH : CK_RHS_PROBE_ROBIN_HOOD);
if (slot == NULL && first == NULL) {
if (slot == -1 && first == -1) {
printf("prout!!\n");
if (ck_rhs_grow(hs, map->capacity << 1) == false) {
orig_slot->in_rh = false;
desc->in_rh = false;
for (unsigned int i = 0; i < prevs_nb; i++) {
orig_slot = &map->descs[prevs[i]];
orig_slot->in_rh = false;
CK_RHS_UNSET_RH(map, prevs[i]);
}
printf("lolol\n");
return -1;
}
printf("hehe\n");
return 1;
}
if (first != NULL) {
int old_probes = first->probes;
if (first != -1) {
desc = CK_RHS_DESC(map, first);
int old_probes = desc->probes;
first->probes = n_probes;
h = ck_rhs_get_first_offset(map, first - map->descs, n_probes);
desc->probes = n_probes;
h = ck_rhs_get_first_offset(map, first, n_probes);
ck_rhs_map_bound_set(map, h, n_probes);
prev = orig_slot - map->descs;
prev = orig_slot;
prevs[prevs_nb++] = prev;
n_probes = old_probes;
goto restart;
} else {
/* An empty slot was found. */
h = ck_rhs_get_first_offset(map, slot - map->descs, n_probes);
h = ck_rhs_get_first_offset(map, slot, n_probes);
ck_rhs_map_bound_set(map, h, n_probes);
ck_pr_store_ptr(&slot->entry, insert);
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert);
ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]);
ck_pr_fence_atomic_store();
slot->probes = n_probes;
orig_slot->in_rh = false;
CK_RHS_SET_PROBES(map, slot, n_probes);
desc->in_rh = 0;
ck_rhs_add_wanted(hs, slot, orig_slot, h);
}
while (prevs_nb > 0) {
prev = prevs[--prevs_nb];
n_probes = orig_slot->probes;
ck_pr_store_ptr(&orig_slot->entry, map->descs[prev].entry);
h = ck_rhs_get_first_offset(map, orig_slot - map->descs,
orig_slot->probes);
ck_rhs_add_wanted(hs, orig_slot, &map->descs[prev], h);
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, orig_slot),
CK_RHS_ENTRY(map, prev));
h = ck_rhs_get_first_offset(map, orig_slot,
desc->probes);
ck_rhs_add_wanted(hs, orig_slot, prev, h);
ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]);
ck_pr_fence_atomic_store();
orig_slot = &map->descs[prev];
orig_slot->in_rh = false;
orig_slot = prev;
desc->in_rh = false;
desc = CK_RHS_DESC(map, orig_slot);
}
return 0;
}
static void
ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot)
ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, long slot)
{
struct ck_rhs_map *map = hs->map;
struct ck_rhs_entry_desc *desc = CK_RHS_DESC(map, slot), *new_desc = NULL;
unsigned long h;
h = ck_rhs_remove_wanted(hs, slot, -1);
while (slot->wanted > 0) {
while (desc->wanted > 0) {
unsigned long offset = 0, tmp_offset;
unsigned long wanted_probes = 1;
unsigned int probe = 0;
@ -700,39 +944,42 @@ ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, struct ck_rhs_entry_desc *slo
/* Find a successor */
while (wanted_probes < map->probe_maximum) {
probe = wanted_probes;
offset = ck_rhs_map_probe_next(map, slot - map->descs, probe);
offset = ck_rhs_map_probe_next(map, slot, probe);
while (probe < map->probe_maximum) {
if (map->descs[offset].probes == probe + 1)
break;
new_desc = CK_RHS_DESC(map, offset);
if (new_desc->probes == probe + 1)
break;
probe++;
offset = ck_rhs_map_probe_next(map, offset, probe);
}
if (probe < map->probe_maximum)
break;
break;
wanted_probes++;
}
if (wanted_probes == map->probe_maximum) {
slot->wanted = 0;
desc->wanted = 0;
break;
}
slot->probes = wanted_probes;
desc->probes = wanted_probes;
h = ck_rhs_remove_wanted(hs, &map->descs[offset], slot - map->descs);
ck_pr_store_ptr(&slot->entry, map->descs[offset].entry);
h = ck_rhs_remove_wanted(hs, offset, slot);
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot),
CK_RHS_ENTRY(map, offset));
ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]);
ck_pr_fence_atomic_store();
if (wanted_probes < CK_RHS_WORD_MAX) {
if (map->descs[h].wanted == 1)
CK_RHS_STORE(&map->descs[h].probe_bound,
struct ck_rhs_entry_desc *hdesc = CK_RHS_DESC(map, h);
if (hdesc->wanted == 1)
CK_RHS_STORE(&hdesc->probe_bound,
wanted_probes);
else if (map->descs[h].probe_bound == CK_RHS_WORD_MAX ||
map->descs[h].probe_bound == map->descs[offset].probes) {
else if (hdesc->probe_bound == CK_RHS_WORD_MAX ||
hdesc->probe_bound == new_desc->probes) {
probe++;
if (map->descs[h].probe_bound == CK_RHS_WORD_MAX)
if (hdesc->probe_bound == CK_RHS_WORD_MAX)
max_probes = map->probe_maximum;
else {
max_probes = map->descs[h].probe_bound;
max_probes = hdesc->probe_bound;
max_probes--;
}
tmp_offset = ck_rhs_map_probe_next(map, offset, probe);
@ -743,21 +990,22 @@ ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, struct ck_rhs_entry_desc *slo
probe++;
tmp_offset = ck_rhs_map_probe_next(map, tmp_offset, probe);
}
if (probe >= max_probes)
CK_RHS_STORE(&map->descs[h].probe_bound,
if (probe == max_probes)
CK_RHS_STORE(&hdesc->probe_bound,
wanted_probes);
}
}
if (slot->wanted < CK_RHS_MAX_WANTED)
slot->wanted--;
slot = &map->descs[offset];
if (desc->wanted < CK_RHS_MAX_WANTED)
desc->wanted--;
slot = offset;
desc = new_desc;
}
ck_pr_store_ptr(&slot->entry, CK_RHS_EMPTY);
if ((slot->probes - 1) < CK_RHS_WORD_MAX)
CK_RHS_STORE(&map->descs[h].probe_bound, slot->probes - 1);
slot->probes = 0;
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), CK_RHS_EMPTY);
if ((desc->probes - 1) < CK_RHS_WORD_MAX)
CK_RHS_STORE(CK_RHS_PROBE_BOUND_ADDR(map, h),
desc->probes - 1);
desc->probes = 0;
}
bool
@ -766,14 +1014,15 @@ ck_rhs_fas(struct ck_rhs *hs,
const void *key,
void **previous)
{
struct ck_rhs_entry_desc *slot, *first;
long slot, first;
void *object, *insert;
unsigned long n_probes;
struct ck_rhs_map *map = hs->map;
struct ck_rhs_entry_desc *desc, *desc2;
*previous = NULL;
restart:
slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object,
slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object,
ck_rhs_map_bound_get(map, h), CK_RHS_PROBE);
/* Replacement semantics presume existence. */
@ -782,26 +1031,28 @@ restart:
insert = ck_rhs_marshal(hs->mode, key, h);
if (first != NULL) {
if (first != -1) {
int ret;
desc = CK_RHS_DESC(map, slot);
slot->in_rh = true;
ret = ck_rhs_put_robin_hood(hs, first);
slot->in_rh = false;
desc2 = CK_RHS_DESC(map, first);
desc->in_rh = true;
ret = ck_rhs_put_robin_hood(hs, first, desc2);
desc->in_rh = false;
if (CK_CC_UNLIKELY(ret == 1))
goto restart;
else if (CK_CC_UNLIKELY(ret != 0))
return false;
ck_pr_store_ptr(&first->entry, insert);
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, first), insert);
ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]);
ck_pr_fence_atomic_store();
first->probes = n_probes;
ck_rhs_add_wanted(hs, first, NULL, h);
desc2->probes = n_probes;
ck_rhs_add_wanted(hs, first, -1, h);
ck_rhs_do_backward_shift_delete(hs, slot);
} else {
ck_pr_store_ptr(&slot->entry, insert);
slot->probes = n_probes;
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert);
CK_RHS_SET_PROBES(map, slot, n_probes);
}
*previous = object;
return true;
@ -813,7 +1064,7 @@ ck_rhs_set(struct ck_rhs *hs,
const void *key,
void **previous)
{
struct ck_rhs_entry_desc *slot, *first;
long slot, first;
void *object, *insert;
unsigned long n_probes;
struct ck_rhs_map *map;
@ -823,8 +1074,8 @@ ck_rhs_set(struct ck_rhs *hs,
restart:
map = hs->map;
slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, map->probe_limit, CK_RHS_PROBE_INSERT);
if (slot == NULL && first == NULL) {
slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object, map->probe_limit, CK_RHS_PROBE_INSERT);
if (slot == -1 && first == -1) {
if (ck_rhs_grow(hs, map->capacity << 1) == false)
return false;
@ -834,20 +1085,24 @@ restart:
ck_rhs_map_bound_set(map, h, n_probes);
insert = ck_rhs_marshal(hs->mode, key, h);
if (first != NULL) {
if (slot)
slot->in_rh = true;
int ret = ck_rhs_put_robin_hood(hs, first);
if (slot)
slot->in_rh = false;
if (first != -1) {
struct ck_rhs_entry_desc *desc = NULL, *desc2;
if (slot != -1) {
desc = CK_RHS_DESC(map, slot);
desc->in_rh = true;
}
desc2 = CK_RHS_DESC(map, first);
int ret = ck_rhs_put_robin_hood(hs, first, desc2);
if (slot != -1)
desc->in_rh = false;
if (CK_CC_LIKELY(ret == 1))
goto restart;
if (CK_CC_LIKELY(ret == -1))
return false;
/* If an earlier bucket was found, then store entry there. */
ck_pr_store_ptr(&first->entry, insert);
first->probes = n_probes;
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, first), insert);
desc2->probes = n_probes;
/*
* If a duplicate key was found, then delete it after
* signaling concurrent probes to restart. Optionally,
@ -855,7 +1110,7 @@ restart:
* period if we can guarantee earlier position of
* duplicate key.
*/
ck_rhs_add_wanted(hs, first, NULL, h);
ck_rhs_add_wanted(hs, first, -1, h);
if (object != NULL) {
ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]);
ck_pr_fence_atomic_store();
@ -867,15 +1122,15 @@ restart:
* If we are storing into same slot, then atomic store is sufficient
* for replacement.
*/
ck_pr_store_ptr(&slot->entry, insert);
slot->probes = n_probes;
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert);
CK_RHS_SET_PROBES(map, slot, n_probes);
if (object == NULL)
ck_rhs_add_wanted(hs, slot, NULL, h);
ck_rhs_add_wanted(hs, slot, -1, h);
}
if (object == NULL) {
map->n_entries++;
if ((map->n_entries << 1) > map->capacity)
if ((map->n_entries ) > ((map->capacity * CK_RHS_LOAD_FACTOR) / 100))
ck_rhs_grow(hs, map->capacity << 1);
}
@ -889,19 +1144,18 @@ ck_rhs_put_internal(struct ck_rhs *hs,
const void *key,
enum ck_rhs_probe_behavior behavior)
{
struct ck_rhs_entry_desc *slot, *first;
long slot, first;
void *object, *insert;
unsigned long n_probes;
struct ck_rhs_map *map;
struct ck_rhs_entry_desc *desc;
restart:
map = hs->map;
slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object,
slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object,
map->probe_limit, behavior);
if (slot == NULL && first == NULL) {
if (slot == -1 && first == -1) {
if (ck_rhs_grow(hs, map->capacity << 1) == false)
return false;
@ -915,30 +1169,28 @@ restart:
ck_rhs_map_bound_set(map, h, n_probes);
insert = ck_rhs_marshal(hs->mode, key, h);
if (first != NULL) {
int ret = ck_rhs_put_robin_hood(hs, first);
if (first != -1) {
struct ck_rhs_entry_desc *desc = CK_RHS_DESC(map, first);
int ret = ck_rhs_put_robin_hood(hs, first, desc);
if (CK_CC_UNLIKELY(ret == 1))
return ck_rhs_put_internal(hs, h, key, behavior);
else if (CK_CC_UNLIKELY(ret == -1))
return false;
/* Insert key into first bucket in probe sequence. */
ck_pr_store_ptr(&first->entry, insert);
desc = first;
ck_rhs_add_wanted(hs, first, NULL, h);
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, first), insert);
desc->probes = n_probes;
ck_rhs_add_wanted(hs, first, -1, h);
} else {
/* An empty slot was found. */
ck_pr_store_ptr(&slot->entry, insert);
desc = slot;
ck_rhs_add_wanted(hs, slot, NULL, h);
ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert);
CK_RHS_SET_PROBES(map, slot, n_probes);
ck_rhs_add_wanted(hs, slot, -1, h);
}
desc->probes = n_probes;
map->n_entries++;
if ((map->n_entries << 1) > map->capacity)
if ((map->n_entries ) > ((map->capacity * CK_RHS_LOAD_FACTOR) / 100))
ck_rhs_grow(hs, map->capacity << 1);
return true;
}
@ -965,7 +1217,7 @@ ck_rhs_get(struct ck_rhs *hs,
unsigned long h,
const void *key)
{
struct ck_rhs_entry_desc *first;
long first;
void *object;
struct ck_rhs_map *map;
unsigned long n_probes;
@ -979,8 +1231,8 @@ ck_rhs_get(struct ck_rhs *hs,
probe = ck_rhs_map_bound_get(map, h);
ck_pr_fence_load();
first = NULL;
ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, probe, CK_RHS_PROBE_NO_RH);
first = -1;
map->probe_func(hs, map, &n_probes, &first, h, key, &object, probe, CK_RHS_PROBE_NO_RH);
ck_pr_fence_load();
g_p = ck_pr_load_uint(generation);
@ -994,12 +1246,12 @@ ck_rhs_remove(struct ck_rhs *hs,
unsigned long h,
const void *key)
{
struct ck_rhs_entry_desc *slot, *first;
long slot, first;
void *object;
struct ck_rhs_map *map = hs->map;
unsigned long n_probes;
slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object,
slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object,
ck_rhs_map_bound_get(map, h), CK_RHS_PROBE_NO_RH);
if (object == NULL)
return NULL;

Loading…
Cancel
Save