From 59cedf10c6eb691c6d06289da1d65b1061134c26 Mon Sep 17 00:00:00 2001 From: User Doginou Date: Mon, 6 Jan 2014 16:27:21 +0100 Subject: [PATCH] ck_rhs: Add a read mostly mode. Add a read-mostly mode, in which entries won't share cache lines with associated datas (probes, probe_bound, etc). This makes write operations slower, but make get faster. --- include/ck_rhs.h | 6 + regressions/ck_rhs/benchmark/serial.c | 1 + src/ck_rhs.c | 582 ++++++++++++++++++-------- 3 files changed, 424 insertions(+), 165 deletions(-) diff --git a/include/ck_rhs.h b/include/ck_rhs.h index 67ac0ec..8c8824d 100644 --- a/include/ck_rhs.h +++ b/include/ck_rhs.h @@ -55,6 +55,12 @@ */ #define CK_RHS_MODE_OBJECT 8 +/* + * Indicated that the load is read-mostly, so get should be optimized + * over put and delete + */ +#define CK_RHS_MODE_READ_MOSTLY 16 + /* Currently unsupported. */ #define CK_RHS_MODE_MPMC (void) diff --git a/regressions/ck_rhs/benchmark/serial.c b/regressions/ck_rhs/benchmark/serial.c index 94c7fde..18fa892 100644 --- a/regressions/ck_rhs/benchmark/serial.c +++ b/regressions/ck_rhs/benchmark/serial.c @@ -507,6 +507,7 @@ main(int argc, char *argv[]) global_seed = common_lrand48(); run_test(argv[1], r, size, 0); + run_test(argv[1], r, size, CK_RHS_MODE_READ_MOSTLY); fprintf(stderr, "# reverse_insertion serial_insertion random_insertion serial_swap " "serial_replace reverse_get serial_get random_get serial_remove negative_get tombstone " "set_unique gc rebuild\n\n"); diff --git a/src/ck_rhs.c b/src/ck_rhs.c index ab6e2ba..377b843 100644 --- a/src/ck_rhs.c +++ b/src/ck_rhs.c @@ -84,15 +84,31 @@ enum ck_rhs_probe_behavior { CK_RHS_PROBE_ROBIN_HOOD, /* Look for the first slot available for the entry we are about to replace, only used to internally implement Robin Hood */ CK_RHS_PROBE_NO_RH, /* Don't do the RH dance */ }; - struct ck_rhs_entry_desc { - void *entry; unsigned int probes; unsigned short wanted; CK_RHS_WORD probe_bound; bool in_rh; + void *entry; } __attribute__ ((__aligned__(16))); +struct ck_rhs_no_entry_desc { + unsigned int probes; + unsigned short wanted; + CK_RHS_WORD probe_bound; + bool in_rh; +} __attribute__ ((__aligned__(8))); + +typedef long (*probe_func) (struct ck_rhs *hs, + struct ck_rhs_map *map, + unsigned long *n_probes, + long *priority, + unsigned long h, + const void *key, + void **object, + unsigned long probe_limit, + enum ck_rhs_probe_behavior behavior); + struct ck_rhs_map { unsigned int generation[CK_RHS_G]; unsigned int probe_maximum; @@ -103,9 +119,88 @@ struct ck_rhs_map { unsigned long capacity; unsigned long size; char offset_mask; - struct ck_rhs_entry_desc *descs; + union { + struct ck_rhs_entry_desc *descs; + struct ck_rhs_no_entry { + void **entries; + struct ck_rhs_no_entry_desc *descs; + } no_entries; + } entries; + bool read_mostly; + probe_func probe_func; }; +#define CK_RHS_ENTRY(map, offset) (map->read_mostly ? map->entries.no_entries.entries[offset] : map->entries.descs[offset].entry) +#define CK_RHS_ENTRY_ADDR(map, offset) (map->read_mostly ? &map->entries.no_entries.entries[offset] : &map->entries.descs[offset].entry) +#define CK_RHS_DESC(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? (struct ck_rhs_entry_desc *)(void *)&map->entries.no_entries.descs[offset] : &map->entries.descs[offset]) +#define CK_RHS_WANTED_INC(map, offset) do { \ + if (CK_CC_UNLIKELY(map->read_mostly)) \ + map->entries.no_entries.descs[offset].wanted++; \ + else \ + map->entries.descs[offset].wanted++; \ +} while (0) +#define CK_RHS_WANTED(map, offset) (CK_CC_UNLIKELY(map->read_mostly )? map->entries.no_entries.descs[offset].wanted : map->entries.descs[offset].wanted) +#define CK_RHS_SET_WANTED(map, offset, value) do { \ + if (CK_CC_UNLIKELY(map->read_mostly)) \ + map->entries.no_entries.descs[offset].wanted = value; \ + else \ + map->entries.descs[offset].probes = value; \ +} while (0) + +#define CK_RHS_WANTED_DEC(map, offset) do { \ + if (CK_CC_UNLIKELY(map->read_mostly)) \ + map->entries.no_entries.descs[offset].wanted--; \ + else \ + map->entries.descs[offset].wanted--; \ +} while (0) + +#define CK_RHS_PROBES(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? map->entries.no_entries.descs[offset].probes : map->entries.descs[offset].probes) +#define CK_RHS_SET_PROBES(map, offset, value) do { \ + if (CK_CC_UNLIKELY(map->read_mostly)) \ + map->entries.no_entries.descs[offset].probes = value; \ + else \ + map->entries.descs[offset].probes = value; \ +} while (0) +#define CK_RHS_PROBE_BOUND(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? map->entries.no_entries.descs[offset].probe_bound : map->entries.descs[offset].probe_bound) +#define CK_RHS_PROBE_BOUND_ADDR(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? (&map->entries.no_entries.descs[offset].probe_bound) : (&map->entries.descs[offset].probe_bound)) +#define CK_RHS_IN_RH(map, offset) (CK_CC_UNLIKELY(map->read_mostly) ? map->entries.no_entries.descs[offset].in_rh : map->entries.descs[offset].in_rh) +#define CK_RHS_SET_RH(map, offset) do { \ + if (CK_CC_UNLIKELY(map->read_mostly)) \ + map->entries.no_entries.descs[offset].in_rh = true; \ + else \ + map->entries.descs[offset].in_rh = true; \ +} while (0) +#define CK_RHS_UNSET_RH(map, offset) do { \ + if (CK_CC_UNLIKELY(map->read_mostly)) \ + map->entries.no_entries.descs[offset].in_rh = false; \ + else \ + map->entries.descs[offset].in_rh = false; \ +} while (0) + +#define CK_RHS_LOAD_FACTOR 50 + +static long +ck_rhs_map_probe(struct ck_rhs *hs, + struct ck_rhs_map *map, + unsigned long *n_probes, + long *priority, + unsigned long h, + const void *key, + void **object, + unsigned long probe_limit, + enum ck_rhs_probe_behavior behavior); + +static long +ck_rhs_map_probe_rm(struct ck_rhs *hs, + struct ck_rhs_map *map, + unsigned long *n_probes, + long *priority, + unsigned long h, + const void *key, + void **object, + unsigned long probe_limit, + enum ck_rhs_probe_behavior behavior); + void ck_rhs_iterator_init(struct ck_rhs_iterator *iterator) { @@ -125,7 +220,7 @@ ck_rhs_next(struct ck_rhs *hs, struct ck_rhs_iterator *i, void **key) return false; do { - value = map->descs[i->offset].entry; + value = CK_RHS_ENTRY(map, i->offset); if (value != CK_RHS_EMPTY) { #ifdef CK_RHS_PP if (hs->mode & CK_RHS_MODE_OBJECT) @@ -183,15 +278,18 @@ ck_rhs_map_create(struct ck_rhs *hs, unsigned long entries) if (n_entries < CK_RHS_PROBE_L1) return NULL; - size = sizeof(struct ck_rhs_map) + - (sizeof(struct ck_rhs_entry_desc) * n_entries + CK_MD_CACHELINE - 1); - + if (hs->mode & CK_RHS_MODE_READ_MOSTLY) + size = sizeof(struct ck_rhs_map) + + (sizeof(void *) * n_entries + sizeof(struct ck_rhs_no_entry_desc) * n_entries + 2 * CK_MD_CACHELINE - 1); + else + size = sizeof(struct ck_rhs_map) + + (sizeof(struct ck_rhs_entry_desc) * n_entries + CK_MD_CACHELINE - 1); map = hs->m->malloc(size); if (map == NULL) return NULL; + map->read_mostly = !!(hs->mode & CK_RHS_MODE_READ_MOSTLY); map->size = size; - map->offset_mask = (CK_MD_CACHELINE / sizeof(struct ck_rhs_entry_desc)) - 1; /* We should probably use a more intelligent heuristic for default probe length. */ limit = ck_internal_max(n_entries >> (CK_RHS_PROBE_L1_SHIFT + 2), CK_RHS_PROBE_L1_DEFAULT); if (limit > UINT_MAX) @@ -205,10 +303,25 @@ ck_rhs_map_create(struct ck_rhs *hs, unsigned long entries) map->n_entries = 0; /* Align map allocation to cache line. */ - map->descs = (void *)(((uintptr_t)&map[1] + - CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1)); + if (map->read_mostly) { + map->entries.no_entries.entries = (void *)(((uintptr_t)&map[1] + + CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1)); + map->entries.no_entries.descs = (void *)(((uintptr_t)map->entries.no_entries.entries + (sizeof(void *) * n_entries) + CK_MD_CACHELINE - 1) &~ (CK_MD_CACHELINE - 1)); + memset(map->entries.no_entries.entries, 0, + sizeof(void *) * n_entries); + memset(map->entries.no_entries.descs, 0, + sizeof(struct ck_rhs_no_entry_desc)); + map->offset_mask = (CK_MD_CACHELINE / sizeof(void *)) - 1; + map->probe_func = ck_rhs_map_probe_rm; + + } else { + map->entries.descs = (void *)(((uintptr_t)&map[1] + + CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1)); + memset(map->entries.descs, 0, sizeof(struct ck_rhs_entry_desc) * n_entries); + map->offset_mask = (CK_MD_CACHELINE / sizeof(struct ck_rhs_entry_desc)) - 1; + map->probe_func = ck_rhs_map_probe; + } memset(map->generation, 0, sizeof map->generation); - memset(map->descs, 0, sizeof(*map->descs) * n_entries); /* Commit entries purge with respect to map publication. */ ck_pr_fence_store(); @@ -270,16 +383,20 @@ ck_rhs_map_bound_set(struct ck_rhs_map *m, unsigned long n_probes) { unsigned long offset = h & m->mask; + struct ck_rhs_entry_desc *desc; if (n_probes > m->probe_maximum) ck_pr_store_uint(&m->probe_maximum, n_probes); + if (!(m->read_mostly)) { + desc = &m->entries.descs[offset]; - if (m->descs[offset].probe_bound < n_probes) { - if (n_probes > CK_RHS_WORD_MAX) - n_probes = CK_RHS_WORD_MAX; + if (desc->probe_bound < n_probes) { + if (n_probes > CK_RHS_WORD_MAX) + n_probes = CK_RHS_WORD_MAX; - CK_RHS_STORE(&m->descs[offset].probe_bound, n_probes); - ck_pr_fence_store(); + CK_RHS_STORE(&desc->probe_bound, n_probes); + ck_pr_fence_store(); + } } return; @@ -291,9 +408,13 @@ ck_rhs_map_bound_get(struct ck_rhs_map *m, unsigned long h) unsigned long offset = h & m->mask; unsigned int r = CK_RHS_WORD_MAX; - r = CK_RHS_LOAD(&m->descs[offset].probe_bound); - if (r == CK_RHS_WORD_MAX) + if (m->read_mostly) r = ck_pr_load_uint(&m->probe_maximum); + else { + r = CK_RHS_LOAD(&m->entries.descs[offset].probe_bound); + if (r == CK_RHS_WORD_MAX) + r = ck_pr_load_uint(&m->probe_maximum); + } return r; } @@ -317,7 +438,7 @@ restart: for (k = 0; k < map->capacity; k++) { unsigned long h; - prev_saved = previous = map->descs[k].entry; + prev_saved = previous = CK_RHS_ENTRY(map, k); if (previous == CK_RHS_EMPTY) continue; @@ -331,7 +452,7 @@ restart: probes = 0; for (;;) { - void **cursor = &update->descs[offset].entry; + void **cursor = CK_RHS_ENTRY_ADDR(update, offset); if (probes++ == update->probe_limit) { /* @@ -345,10 +466,10 @@ restart: if (CK_CC_LIKELY(*cursor == CK_RHS_EMPTY)) { *cursor = prev_saved; update->n_entries++; - update->descs[offset].probes = probes; + CK_RHS_SET_PROBES(update, offset, probes); ck_rhs_map_bound_set(update, h, probes); break; - } else if (update->descs[offset].probes < probes) { + } else if (CK_RHS_PROBES(update, offset) < probes) { void *tmp = prev_saved; unsigned int old_probes; prev_saved = previous = *cursor; @@ -359,12 +480,12 @@ restart: *cursor = tmp; ck_rhs_map_bound_set(update, h, probes); h = hs->hf(previous, hs->seed); - old_probes = update->descs[offset].probes; - update->descs[offset].probes = probes; + old_probes = CK_RHS_PROBES(update, offset); + CK_RHS_SET_PROBES(update, offset, probes); probes = old_probes - 1; continue; - } - update->descs[offset].wanted++; + } + CK_RHS_WANTED_INC(update, offset); offset = ck_rhs_map_probe_next(update, offset, probes); } @@ -384,11 +505,129 @@ ck_rhs_rebuild(struct ck_rhs *hs) return ck_rhs_grow(hs, hs->map->capacity); } -static struct ck_rhs_entry_desc * +static long +ck_rhs_map_probe_rm(struct ck_rhs *hs, + struct ck_rhs_map *map, + unsigned long *n_probes, + long *priority, + unsigned long h, + const void *key, + void **object, + unsigned long probe_limit, + enum ck_rhs_probe_behavior behavior) +{ + + void *k; + const void *compare; + long pr = -1; + unsigned long offset, probes, opl; + +#ifdef CK_RHS_PP + /* If we are storing object pointers, then we may leverage pointer packing. */ + unsigned long hv = 0; + + if (hs->mode & CK_RHS_MODE_OBJECT) { + hv = (h >> 25) & CK_RHS_KEY_MASK; + compare = CK_RHS_VMA(key); + } else { + compare = key; + } +#else + compare = key; +#endif + + *object = NULL; + if (behavior != CK_RHS_PROBE_ROBIN_HOOD) { + probes = 0; + offset = h & map->mask; + } else { + /* Restart from the bucket we were previously in */ + probes = *n_probes; + offset = ck_rhs_map_probe_next(map, *priority, + probes); + } + opl = probe_limit; + + + for (;;) { + if (probes++ == probe_limit) { + if (probe_limit == opl || pr != -1) { + k = CK_RHS_EMPTY; + goto leave; + } + /* + * If no eligible slot has been found yet, continue probe + * sequence with original probe limit. + */ + probe_limit = opl; + } + + k = ck_pr_load_ptr(&map->entries.no_entries.entries[offset]); + if (k == CK_RHS_EMPTY) + goto leave; + + if ((behavior != CK_RHS_PROBE_NO_RH)) { + struct ck_rhs_entry_desc *desc = (void *)&map->entries.no_entries.descs[offset]; + + if (pr == -1 && + desc->in_rh == false && desc->probes < probes) { + pr = offset; + *n_probes = probes; + + if (behavior == CK_RHS_PROBE_RH || + behavior == CK_RHS_PROBE_ROBIN_HOOD) { + k = CK_RHS_EMPTY; + goto leave; + } + } + } + + + if (behavior != CK_RHS_PROBE_ROBIN_HOOD) { +#ifdef CK_RHS_PP + if (hs->mode & CK_RHS_MODE_OBJECT) { + if (((uintptr_t)k >> CK_MD_VMA_BITS) != hv) { + offset = ck_rhs_map_probe_next(map, offset, probes); + continue; + } + + k = CK_RHS_VMA(k); + } +#endif + + if (k == compare) + goto leave; + + if (hs->compare == NULL) { + offset = ck_rhs_map_probe_next(map, offset, probes); + continue; + } + + if (hs->compare(k, key) == true) + goto leave; + } + offset = ck_rhs_map_probe_next(map, offset, probes); + } + +leave: + if (probes > probe_limit) { + offset = -1; + } else { + *object = k; + } + + if (pr == -1) + *n_probes = probes; + + *priority = pr; + return offset; +} + +static long ck_rhs_map_probe(struct ck_rhs *hs, struct ck_rhs_map *map, unsigned long *n_probes, - struct ck_rhs_entry_desc **priority, + long *priority, unsigned long h, const void *key, void **object, @@ -398,7 +637,7 @@ ck_rhs_map_probe(struct ck_rhs *hs, void *k; const void *compare; - struct ck_rhs_entry_desc *cursor, *pr = NULL; + long pr = -1; unsigned long offset, probes, opl; #ifdef CK_RHS_PP @@ -422,7 +661,7 @@ ck_rhs_map_probe(struct ck_rhs *hs, } else { /* Restart from the bucket we were previously in */ probes = *n_probes; - offset = ck_rhs_map_probe_next(map, *priority - map->descs, + offset = ck_rhs_map_probe_next(map, *priority, probes); } opl = probe_limit; @@ -431,9 +670,8 @@ ck_rhs_map_probe(struct ck_rhs *hs, for (;;) { - cursor = &map->descs[offset]; if (probes++ == probe_limit) { - if (probe_limit == opl || pr != NULL) { + if (probe_limit == opl || pr != -1) { k = CK_RHS_EMPTY; goto leave; } @@ -444,26 +682,24 @@ ck_rhs_map_probe(struct ck_rhs *hs, probe_limit = opl; } - k = ck_pr_load_ptr(&cursor->entry); + k = ck_pr_load_ptr(&map->entries.descs[offset].entry); if (k == CK_RHS_EMPTY) goto leave; - if ((behavior != CK_RHS_PROBE_NO_RH) && - (map->descs[offset].in_rh == false) && - map->descs[offset].probes < - probes) { - if (pr == NULL) { - pr = &map->descs[offset]; + if ((behavior != CK_RHS_PROBE_NO_RH)) { + struct ck_rhs_entry_desc *desc = &map->entries.descs[offset];; + + if (pr == -1 && + desc->in_rh == false && desc->probes < probes) { + pr = offset; *n_probes = probes; if (behavior == CK_RHS_PROBE_RH || - behavior == CK_RHS_PROBE_ROBIN_HOOD) { + behavior == CK_RHS_PROBE_ROBIN_HOOD) { k = CK_RHS_EMPTY; goto leave; } } - offset = ck_rhs_map_probe_next(map, offset, probes); - continue; } @@ -495,16 +731,16 @@ ck_rhs_map_probe(struct ck_rhs *hs, leave: if (probes > probe_limit) { - cursor = NULL; + offset = -1; } else { *object = k; } - if (pr == NULL) + if (pr == -1) *n_probes = probes; *priority = pr; - return cursor; + return offset; } static inline void * @@ -535,53 +771,58 @@ ck_rhs_gc(struct ck_rhs *hs) unsigned int max_probes = 0; for (i = 0; i < map->capacity; i++) { - if (map->descs[i].probes > max_probes) - max_probes = map->descs[i].probes; + if (CK_RHS_PROBES(map, i) > max_probes) + max_probes = CK_RHS_PROBES(map, i); } map->probe_maximum = max_probes; return true; } static void -ck_rhs_add_wanted(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot, struct ck_rhs_entry_desc *old_slot, +ck_rhs_add_wanted(struct ck_rhs *hs, long end_offset, long old_slot, unsigned long h) { struct ck_rhs_map *map = hs->map; - long offset, end_offset; + long offset; unsigned int probes = 1; bool found_slot = false; + struct ck_rhs_entry_desc *desc; offset = h & map->mask; - end_offset = slot - map->descs; - if (old_slot == NULL) + if (old_slot == -1) found_slot = true; while (offset != end_offset) { - if (offset == old_slot - map->descs) + if (offset == old_slot) found_slot = true; - if (found_slot && map->descs[offset].wanted < CK_RHS_MAX_WANTED) - map->descs[offset].wanted++; + if (found_slot) { + desc = CK_RHS_DESC(map, offset); + if (desc->wanted < CK_RHS_MAX_WANTED) + desc->wanted++; + } offset = ck_rhs_map_probe_next(map, offset, probes); probes++; } } static unsigned long -ck_rhs_remove_wanted(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot, long limit) +ck_rhs_remove_wanted(struct ck_rhs *hs, long offset, long limit) { struct ck_rhs_map *map = hs->map; - int probes = slot->probes; - long offset = slot - map->descs; + int probes = CK_RHS_PROBES(map, offset); bool do_remove = true; + struct ck_rhs_entry_desc *desc; while (probes > 1) { probes--; offset = ck_rhs_map_probe_prev(map, offset, probes); if (offset == limit) do_remove = false; - if (map->descs[offset].wanted != CK_RHS_MAX_WANTED && do_remove) - map->descs[offset].wanted--; - + if (do_remove) { + desc = CK_RHS_DESC(map, offset); + if (desc->wanted != CK_RHS_MAX_WANTED) + desc->wanted--; + } } return offset; } @@ -589,7 +830,7 @@ ck_rhs_remove_wanted(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot, long lim static long ck_rhs_get_first_offset(struct ck_rhs_map *map, unsigned long offset, unsigned int probes) { - while (probes > 4) { + while (probes > (unsigned long)map->offset_mask + 1) { offset -= ((probes - 1) &~ map->offset_mask); offset &= map->mask; offset = (offset &~ map->offset_mask) + @@ -603,11 +844,11 @@ ck_rhs_get_first_offset(struct ck_rhs_map *map, unsigned long offset, unsigned i static int ck_rhs_put_robin_hood(struct ck_rhs *hs, - struct ck_rhs_entry_desc *orig_slot) + long orig_slot, struct ck_rhs_entry_desc *desc) { - struct ck_rhs_entry_desc *slot, *first; + long slot, first; void *object, *insert; - unsigned long orig_probes, n_probes; + unsigned long n_probes; struct ck_rhs_map *map; unsigned long h = 0; long prev; @@ -617,80 +858,83 @@ ck_rhs_put_robin_hood(struct ck_rhs *hs, map = hs->map; first = orig_slot; - n_probes = orig_slot->probes; + n_probes = desc->probes; restart: - orig_probes = n_probes; - key = first->entry; + key = CK_RHS_ENTRY(map, first); insert = key; #ifdef CK_RHS_PP if (hs->mode & CK_RHS_MODE_OBJECT) key = CK_RHS_VMA(key); #endif orig_slot = first; - orig_slot->in_rh = true; + CK_RHS_SET_RH(map, orig_slot); - slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, + slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object, map->probe_limit, prevs_nb == CK_RHS_MAX_RH ? CK_RHS_PROBE_NO_RH : CK_RHS_PROBE_ROBIN_HOOD); - if (slot == NULL && first == NULL) { + if (slot == -1 && first == -1) { + printf("prout!!\n"); if (ck_rhs_grow(hs, map->capacity << 1) == false) { - orig_slot->in_rh = false; + desc->in_rh = false; for (unsigned int i = 0; i < prevs_nb; i++) { - orig_slot = &map->descs[prevs[i]]; - orig_slot->in_rh = false; + CK_RHS_UNSET_RH(map, prevs[i]); } + printf("lolol\n"); return -1; } + printf("hehe\n"); return 1; } - if (first != NULL) { - int old_probes = first->probes; + if (first != -1) { + desc = CK_RHS_DESC(map, first); + int old_probes = desc->probes; - first->probes = n_probes; - h = ck_rhs_get_first_offset(map, first - map->descs, n_probes); + desc->probes = n_probes; + h = ck_rhs_get_first_offset(map, first, n_probes); ck_rhs_map_bound_set(map, h, n_probes); - prev = orig_slot - map->descs; + prev = orig_slot; prevs[prevs_nb++] = prev; n_probes = old_probes; goto restart; } else { /* An empty slot was found. */ - h = ck_rhs_get_first_offset(map, slot - map->descs, n_probes); + h = ck_rhs_get_first_offset(map, slot, n_probes); ck_rhs_map_bound_set(map, h, n_probes); - ck_pr_store_ptr(&slot->entry, insert); + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert); ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]); ck_pr_fence_atomic_store(); - slot->probes = n_probes; - orig_slot->in_rh = false; + CK_RHS_SET_PROBES(map, slot, n_probes); + desc->in_rh = 0; ck_rhs_add_wanted(hs, slot, orig_slot, h); } while (prevs_nb > 0) { prev = prevs[--prevs_nb]; - n_probes = orig_slot->probes; - ck_pr_store_ptr(&orig_slot->entry, map->descs[prev].entry); - h = ck_rhs_get_first_offset(map, orig_slot - map->descs, - orig_slot->probes); - ck_rhs_add_wanted(hs, orig_slot, &map->descs[prev], h); + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, orig_slot), + CK_RHS_ENTRY(map, prev)); + h = ck_rhs_get_first_offset(map, orig_slot, + desc->probes); + ck_rhs_add_wanted(hs, orig_slot, prev, h); ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]); ck_pr_fence_atomic_store(); - orig_slot = &map->descs[prev]; - - orig_slot->in_rh = false; + orig_slot = prev; + desc->in_rh = false; + desc = CK_RHS_DESC(map, orig_slot); } return 0; } static void -ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, struct ck_rhs_entry_desc *slot) +ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, long slot) { struct ck_rhs_map *map = hs->map; + struct ck_rhs_entry_desc *desc = CK_RHS_DESC(map, slot), *new_desc = NULL; unsigned long h; h = ck_rhs_remove_wanted(hs, slot, -1); - while (slot->wanted > 0) { + while (desc->wanted > 0) { unsigned long offset = 0, tmp_offset; unsigned long wanted_probes = 1; unsigned int probe = 0; @@ -700,39 +944,42 @@ ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, struct ck_rhs_entry_desc *slo /* Find a successor */ while (wanted_probes < map->probe_maximum) { probe = wanted_probes; - offset = ck_rhs_map_probe_next(map, slot - map->descs, probe); + offset = ck_rhs_map_probe_next(map, slot, probe); while (probe < map->probe_maximum) { - if (map->descs[offset].probes == probe + 1) - break; + new_desc = CK_RHS_DESC(map, offset); + if (new_desc->probes == probe + 1) + break; probe++; offset = ck_rhs_map_probe_next(map, offset, probe); } if (probe < map->probe_maximum) - break; + break; wanted_probes++; } if (wanted_probes == map->probe_maximum) { - slot->wanted = 0; + desc->wanted = 0; break; } - slot->probes = wanted_probes; + desc->probes = wanted_probes; - h = ck_rhs_remove_wanted(hs, &map->descs[offset], slot - map->descs); - ck_pr_store_ptr(&slot->entry, map->descs[offset].entry); + h = ck_rhs_remove_wanted(hs, offset, slot); + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), + CK_RHS_ENTRY(map, offset)); ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]); ck_pr_fence_atomic_store(); if (wanted_probes < CK_RHS_WORD_MAX) { - if (map->descs[h].wanted == 1) - CK_RHS_STORE(&map->descs[h].probe_bound, + struct ck_rhs_entry_desc *hdesc = CK_RHS_DESC(map, h); + if (hdesc->wanted == 1) + CK_RHS_STORE(&hdesc->probe_bound, wanted_probes); - else if (map->descs[h].probe_bound == CK_RHS_WORD_MAX || - map->descs[h].probe_bound == map->descs[offset].probes) { + else if (hdesc->probe_bound == CK_RHS_WORD_MAX || + hdesc->probe_bound == new_desc->probes) { probe++; - if (map->descs[h].probe_bound == CK_RHS_WORD_MAX) + if (hdesc->probe_bound == CK_RHS_WORD_MAX) max_probes = map->probe_maximum; else { - max_probes = map->descs[h].probe_bound; + max_probes = hdesc->probe_bound; max_probes--; } tmp_offset = ck_rhs_map_probe_next(map, offset, probe); @@ -743,21 +990,22 @@ ck_rhs_do_backward_shift_delete(struct ck_rhs *hs, struct ck_rhs_entry_desc *slo probe++; tmp_offset = ck_rhs_map_probe_next(map, tmp_offset, probe); } - if (probe >= max_probes) - CK_RHS_STORE(&map->descs[h].probe_bound, + if (probe == max_probes) + CK_RHS_STORE(&hdesc->probe_bound, wanted_probes); } } - if (slot->wanted < CK_RHS_MAX_WANTED) - slot->wanted--; - slot = &map->descs[offset]; + if (desc->wanted < CK_RHS_MAX_WANTED) + desc->wanted--; + slot = offset; + desc = new_desc; } - ck_pr_store_ptr(&slot->entry, CK_RHS_EMPTY); - if ((slot->probes - 1) < CK_RHS_WORD_MAX) - CK_RHS_STORE(&map->descs[h].probe_bound, slot->probes - 1); - - slot->probes = 0; + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), CK_RHS_EMPTY); + if ((desc->probes - 1) < CK_RHS_WORD_MAX) + CK_RHS_STORE(CK_RHS_PROBE_BOUND_ADDR(map, h), + desc->probes - 1); + desc->probes = 0; } bool @@ -766,14 +1014,15 @@ ck_rhs_fas(struct ck_rhs *hs, const void *key, void **previous) { - struct ck_rhs_entry_desc *slot, *first; + long slot, first; void *object, *insert; unsigned long n_probes; struct ck_rhs_map *map = hs->map; + struct ck_rhs_entry_desc *desc, *desc2; *previous = NULL; restart: - slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, + slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object, ck_rhs_map_bound_get(map, h), CK_RHS_PROBE); /* Replacement semantics presume existence. */ @@ -782,26 +1031,28 @@ restart: insert = ck_rhs_marshal(hs->mode, key, h); - if (first != NULL) { + if (first != -1) { int ret; + desc = CK_RHS_DESC(map, slot); - slot->in_rh = true; - ret = ck_rhs_put_robin_hood(hs, first); - slot->in_rh = false; + desc2 = CK_RHS_DESC(map, first); + desc->in_rh = true; + ret = ck_rhs_put_robin_hood(hs, first, desc2); + desc->in_rh = false; if (CK_CC_UNLIKELY(ret == 1)) goto restart; else if (CK_CC_UNLIKELY(ret != 0)) return false; - ck_pr_store_ptr(&first->entry, insert); + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, first), insert); ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]); ck_pr_fence_atomic_store(); - first->probes = n_probes; - ck_rhs_add_wanted(hs, first, NULL, h); + desc2->probes = n_probes; + ck_rhs_add_wanted(hs, first, -1, h); ck_rhs_do_backward_shift_delete(hs, slot); } else { - ck_pr_store_ptr(&slot->entry, insert); - slot->probes = n_probes; + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert); + CK_RHS_SET_PROBES(map, slot, n_probes); } *previous = object; return true; @@ -813,7 +1064,7 @@ ck_rhs_set(struct ck_rhs *hs, const void *key, void **previous) { - struct ck_rhs_entry_desc *slot, *first; + long slot, first; void *object, *insert; unsigned long n_probes; struct ck_rhs_map *map; @@ -823,8 +1074,8 @@ ck_rhs_set(struct ck_rhs *hs, restart: map = hs->map; - slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, map->probe_limit, CK_RHS_PROBE_INSERT); - if (slot == NULL && first == NULL) { + slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object, map->probe_limit, CK_RHS_PROBE_INSERT); + if (slot == -1 && first == -1) { if (ck_rhs_grow(hs, map->capacity << 1) == false) return false; @@ -834,20 +1085,24 @@ restart: ck_rhs_map_bound_set(map, h, n_probes); insert = ck_rhs_marshal(hs->mode, key, h); - if (first != NULL) { - if (slot) - slot->in_rh = true; - int ret = ck_rhs_put_robin_hood(hs, first); - if (slot) - slot->in_rh = false; + if (first != -1) { + struct ck_rhs_entry_desc *desc = NULL, *desc2; + if (slot != -1) { + desc = CK_RHS_DESC(map, slot); + desc->in_rh = true; + } + desc2 = CK_RHS_DESC(map, first); + int ret = ck_rhs_put_robin_hood(hs, first, desc2); + if (slot != -1) + desc->in_rh = false; if (CK_CC_LIKELY(ret == 1)) goto restart; if (CK_CC_LIKELY(ret == -1)) return false; /* If an earlier bucket was found, then store entry there. */ - ck_pr_store_ptr(&first->entry, insert); - first->probes = n_probes; + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, first), insert); + desc2->probes = n_probes; /* * If a duplicate key was found, then delete it after * signaling concurrent probes to restart. Optionally, @@ -855,7 +1110,7 @@ restart: * period if we can guarantee earlier position of * duplicate key. */ - ck_rhs_add_wanted(hs, first, NULL, h); + ck_rhs_add_wanted(hs, first, -1, h); if (object != NULL) { ck_pr_inc_uint(&map->generation[h & CK_RHS_G_MASK]); ck_pr_fence_atomic_store(); @@ -867,15 +1122,15 @@ restart: * If we are storing into same slot, then atomic store is sufficient * for replacement. */ - ck_pr_store_ptr(&slot->entry, insert); - slot->probes = n_probes; + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert); + CK_RHS_SET_PROBES(map, slot, n_probes); if (object == NULL) - ck_rhs_add_wanted(hs, slot, NULL, h); + ck_rhs_add_wanted(hs, slot, -1, h); } if (object == NULL) { map->n_entries++; - if ((map->n_entries << 1) > map->capacity) + if ((map->n_entries ) > ((map->capacity * CK_RHS_LOAD_FACTOR) / 100)) ck_rhs_grow(hs, map->capacity << 1); } @@ -889,19 +1144,18 @@ ck_rhs_put_internal(struct ck_rhs *hs, const void *key, enum ck_rhs_probe_behavior behavior) { - struct ck_rhs_entry_desc *slot, *first; + long slot, first; void *object, *insert; unsigned long n_probes; struct ck_rhs_map *map; - struct ck_rhs_entry_desc *desc; restart: map = hs->map; - slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, + slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object, map->probe_limit, behavior); - if (slot == NULL && first == NULL) { + if (slot == -1 && first == -1) { if (ck_rhs_grow(hs, map->capacity << 1) == false) return false; @@ -915,30 +1169,28 @@ restart: ck_rhs_map_bound_set(map, h, n_probes); insert = ck_rhs_marshal(hs->mode, key, h); - if (first != NULL) { - int ret = ck_rhs_put_robin_hood(hs, first); - + if (first != -1) { + struct ck_rhs_entry_desc *desc = CK_RHS_DESC(map, first); + int ret = ck_rhs_put_robin_hood(hs, first, desc); if (CK_CC_UNLIKELY(ret == 1)) return ck_rhs_put_internal(hs, h, key, behavior); else if (CK_CC_UNLIKELY(ret == -1)) return false; /* Insert key into first bucket in probe sequence. */ - ck_pr_store_ptr(&first->entry, insert); - desc = first; - ck_rhs_add_wanted(hs, first, NULL, h); - + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, first), insert); + desc->probes = n_probes; + ck_rhs_add_wanted(hs, first, -1, h); } else { /* An empty slot was found. */ - ck_pr_store_ptr(&slot->entry, insert); - desc = slot; - ck_rhs_add_wanted(hs, slot, NULL, h); + ck_pr_store_ptr(CK_RHS_ENTRY_ADDR(map, slot), insert); + CK_RHS_SET_PROBES(map, slot, n_probes); + ck_rhs_add_wanted(hs, slot, -1, h); } - desc->probes = n_probes; map->n_entries++; - if ((map->n_entries << 1) > map->capacity) + + if ((map->n_entries ) > ((map->capacity * CK_RHS_LOAD_FACTOR) / 100)) ck_rhs_grow(hs, map->capacity << 1); - return true; } @@ -965,7 +1217,7 @@ ck_rhs_get(struct ck_rhs *hs, unsigned long h, const void *key) { - struct ck_rhs_entry_desc *first; + long first; void *object; struct ck_rhs_map *map; unsigned long n_probes; @@ -979,8 +1231,8 @@ ck_rhs_get(struct ck_rhs *hs, probe = ck_rhs_map_bound_get(map, h); ck_pr_fence_load(); - first = NULL; - ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, probe, CK_RHS_PROBE_NO_RH); + first = -1; + map->probe_func(hs, map, &n_probes, &first, h, key, &object, probe, CK_RHS_PROBE_NO_RH); ck_pr_fence_load(); g_p = ck_pr_load_uint(generation); @@ -994,12 +1246,12 @@ ck_rhs_remove(struct ck_rhs *hs, unsigned long h, const void *key) { - struct ck_rhs_entry_desc *slot, *first; + long slot, first; void *object; struct ck_rhs_map *map = hs->map; unsigned long n_probes; - slot = ck_rhs_map_probe(hs, map, &n_probes, &first, h, key, &object, + slot = map->probe_func(hs, map, &n_probes, &first, h, key, &object, ck_rhs_map_bound_get(map, h), CK_RHS_PROBE_NO_RH); if (object == NULL) return NULL;