ck_ht: Various performance improvements.

Though a new implementation is in the works, roll in
some performance improvements in the mean time.
The probe routines have been broken out into separate
reader/writer variants. These variants are much less
branch-intensive (and don't involve predict to stall
in many cases).

New implementation is attempting to deal with
interface-induced overheads.
ck_pring
Samy Al Bahra 13 years ago
parent e1ec55819e
commit 1a8b3db453

@ -18,3 +18,4 @@ clean:
include ../../../build/regressions.build
CFLAGS+=-D_GNU_SOURCE

@ -111,13 +111,15 @@ table_get(const char *value)
ck_ht_entry_t entry;
ck_ht_hash_t h;
size_t l = strlen(value);
void *v = NULL;
ck_ht_hash(&h, &ht, value, l);
ck_ht_entry_key_set(&entry, value, l);
if (ck_ht_get_spmc(&ht, h, &entry) == true)
return ck_ht_entry_value(&entry);
return NULL;
if (ck_ht_get_spmc(&ht, h, &entry) == true) {
v = ck_ht_entry_value(&entry);
}
return v;
}
static bool
@ -279,6 +281,10 @@ main(int argc, char *argv[])
}
sr = a / (r * keys_length);
table_reset();
for (i = 0; i < keys_length; i++)
table_insert(keys[i]);
a = 0;
for (j = 0; j < r; j++) {
s = rdtsc();

@ -152,11 +152,12 @@ ck_ht_map_destroy(struct ck_malloc *m, struct ck_ht_map *map, bool defer)
}
static inline size_t
ck_ht_map_probe_next(struct ck_ht_map *map, size_t offset, ck_ht_hash_t h)
ck_ht_map_probe_next(struct ck_ht_map *map, size_t offset, ck_ht_hash_t h, size_t probes)
{
ck_ht_hash_t r;
size_t stride;
(void)probes;
r.value = h.value >> map->step;
stride = (r.value & ~CK_HT_BUCKET_MASK) << 1
| (r.value & CK_HT_BUCKET_MASK);
@ -191,19 +192,18 @@ ck_ht_init(ck_ht_t *table,
}
static struct ck_ht_entry *
ck_ht_map_probe(struct ck_ht_map *map,
ck_ht_hash_t h,
ck_ht_entry_t *snapshot,
ck_ht_entry_t **available,
const void *key,
uint16_t key_length,
uint64_t *probe_limit)
ck_ht_map_probe_wr(struct ck_ht_map *map,
ck_ht_hash_t h,
ck_ht_entry_t *snapshot,
ck_ht_entry_t **available,
const void *key,
uint16_t key_length,
uint64_t *probe_limit)
{
struct ck_ht_entry *bucket, *cursor;
struct ck_ht_entry *first = NULL;
size_t offset, i, j;
uint64_t probes = 0;
uint64_t probe_maximum;
#ifndef CK_HT_PP
uint64_t d = 0;
@ -211,7 +211,6 @@ ck_ht_map_probe(struct ck_ht_map *map,
retry:
#endif
probe_maximum = ck_pr_load_64(&map->probe_maximum);
offset = h.value & map->mask;
for (i = 0; i < map->probe_limit; i++) {
@ -227,51 +226,24 @@ retry:
uint16_t k;
probes++;
if (probe_limit == NULL && probes > probe_maximum)
return NULL;
cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
/*
* Technically, we should probably lift this to a separate probe
* function. A lot of complexity in here belongs only for the
* reader. However, assuming a reasonable BTB we can attempt to
* at least avoid fence costs for the writer until we decide
* it is worth the code duplication.
*/
if (probe_limit == NULL) {
#ifdef CK_HT_PP
snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
ck_pr_fence_load();
snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
#else
d = ck_pr_load_64(&map->deletions);
snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
ck_pr_fence_load();
snapshot->key_length = ck_pr_load_64(&cursor->key_length);
snapshot->hash = ck_pr_load_64(&cursor->hash);
snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
#endif
} else {
*snapshot = *cursor;
}
/*
* It is probably worth it to encapsulate probe state
* in order to prevent a complete reprobe sequence in
* the case of intermittent writers.
*/
if (snapshot->key == CK_HT_KEY_TOMBSTONE) {
if (cursor->key == CK_HT_KEY_TOMBSTONE) {
if (first == NULL)
first = cursor;
continue;
}
if (snapshot->key == CK_HT_KEY_EMPTY)
if (cursor->key == CK_HT_KEY_EMPTY)
goto leave;
if (snapshot->key == (uintptr_t)key)
if (cursor->key == (uintptr_t)key)
goto leave;
if (map->mode == CK_HT_MODE_BYTESTRING) {
@ -281,15 +253,15 @@ retry:
* Check memoized portion of hash value before
* expensive full-length comparison.
*/
k = ck_ht_entry_key_length(snapshot);
k = ck_ht_entry_key_length(cursor);
if (k != key_length)
continue;
#ifdef CK_HT_PP
if (snapshot->value >> 48 != ((h.value >> 32) & 0xFFFF))
if (cursor->value >> 48 != ((h.value >> 32) & 0xFFFF))
continue;
#else
if (snapshot->hash != h.value)
if (cursor->hash != h.value)
continue;
if (probe_limit == NULL) {
@ -304,24 +276,130 @@ retry:
}
#endif
pointer = ck_ht_entry_key(snapshot);
pointer = ck_ht_entry_key(cursor);
if (memcmp(pointer, key, key_length) == 0)
goto leave;
}
}
offset = ck_ht_map_probe_next(map, offset, h);
offset = ck_ht_map_probe_next(map, offset, h, probes);
}
return NULL;
leave:
if (probe_limit != NULL)
*probe_limit = probes;
*probe_limit = probes;
*available = first;
*snapshot = *cursor;
if (available != NULL)
*available = first;
return cursor;
}
static struct ck_ht_entry *
ck_ht_map_probe_rd(struct ck_ht_map *map,
ck_ht_hash_t h,
ck_ht_entry_t *snapshot,
const void *key,
uint16_t key_length)
{
struct ck_ht_entry *bucket, *cursor;
size_t offset, i, j;
uint64_t probes = 0;
uint64_t probe_maximum;
#ifndef CK_HT_PP
uint64_t d = 0;
uint64_t d_prime = 0;
retry:
#endif
probe_maximum = ck_pr_load_64(&map->probe_maximum);
offset = h.value & map->mask;
for (i = 0; i < map->probe_limit; i++) {
/*
* Probe on a complete cache line first. Scan forward and wrap around to
* the beginning of the cache line. Only when the complete cache line has
* been scanned do we move on to the next row.
*/
bucket = (void *)((uintptr_t)(map->entries + offset) &
~(CK_MD_CACHELINE - 1));
for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
uint16_t k;
probes++;
cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
#ifdef CK_HT_PP
snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
ck_pr_fence_load();
snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
#else
d = ck_pr_load_64(&map->deletions);
snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
ck_pr_fence_load();
snapshot->key_length = ck_pr_load_64(&cursor->key_length);
snapshot->hash = ck_pr_load_64(&cursor->hash);
snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
#endif
/*
* It is probably worth it to encapsulate probe state
* in order to prevent a complete reprobe sequence in
* the case of intermittent writers.
*/
if (snapshot->key == CK_HT_KEY_TOMBSTONE)
continue;
if (snapshot->key == CK_HT_KEY_EMPTY)
goto leave;
if (snapshot->key == (uintptr_t)key)
goto leave;
if (map->mode == CK_HT_MODE_BYTESTRING) {
void *pointer;
/*
* Check memoized portion of hash value before
* expensive full-length comparison.
*/
k = ck_ht_entry_key_length(snapshot);
if (k != key_length)
continue;
#ifdef CK_HT_PP
if (snapshot->value >> 48 != ((h.value >> 32) & 0xFFFF))
continue;
#else
if (snapshot->hash != h.value)
continue;
d_prime = ck_pr_load_64(&map->deletions);
/*
* It is possible that the slot was
* replaced, initiate a re-probe.
*/
if (d != d_prime)
goto retry;
#endif
pointer = ck_ht_entry_key(snapshot);
if (memcmp(pointer, key, key_length) == 0)
goto leave;
}
}
if (probes > probe_maximum)
return NULL;
offset = ck_ht_map_probe_next(map, offset, h, probes);
}
return NULL;
leave:
return cursor;
}
@ -441,7 +519,7 @@ restart:
if (j < CK_HT_BUCKET_LENGTH)
break;
offset = ck_ht_map_probe_next(update, offset, h);
offset = ck_ht_map_probe_next(update, offset, h, probes);
}
if (i == update->probe_limit) {
@ -467,16 +545,21 @@ ck_ht_remove_spmc(ck_ht_t *table,
ck_ht_entry_t *entry)
{
struct ck_ht_map *map;
struct ck_ht_entry *candidate, snapshot;
struct ck_ht_entry *candidate, *priority, snapshot;
uint64_t probes;
map = table->map;
if (table->mode == CK_HT_MODE_BYTESTRING) {
candidate = ck_ht_map_probe(map, h, &snapshot, NULL,
ck_ht_entry_key(entry), ck_ht_entry_key_length(entry), NULL);
candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
ck_ht_entry_key(entry),
ck_ht_entry_key_length(entry),
&probes);
} else {
candidate = ck_ht_map_probe(map, h, &snapshot, NULL,
(void *)entry->key, sizeof(entry->key), NULL);
candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
(void *)entry->key,
sizeof(entry->key),
&probes);
}
/* No matching entry was found. */
@ -508,7 +591,6 @@ ck_ht_remove_spmc(ck_ht_t *table,
ck_pr_store_64(&map->deletions, map->deletions + 1);
ck_pr_fence_store();
ck_pr_store_64(&map->n_entries, map->n_entries - 1);
return true;
}
@ -531,11 +613,11 @@ restart:
d = ck_pr_load_64(&map->deletions);
if (table->mode == CK_HT_MODE_BYTESTRING) {
candidate = ck_ht_map_probe(map, h, &snapshot, NULL,
ck_ht_entry_key(entry), ck_ht_entry_key_length(entry), NULL);
candidate = ck_ht_map_probe_rd(map, h, &snapshot,
ck_ht_entry_key(entry), ck_ht_entry_key_length(entry));
} else {
candidate = ck_ht_map_probe(map, h, &snapshot, NULL,
(void *)entry->key, sizeof(entry->key), NULL);
candidate = ck_ht_map_probe_rd(map, h, &snapshot,
(void *)entry->key, sizeof(entry->key));
}
d_prime = ck_pr_load_64(&map->deletions);
@ -568,12 +650,12 @@ ck_ht_set_spmc(ck_ht_t *table,
map = table->map;
if (table->mode == CK_HT_MODE_BYTESTRING) {
candidate = ck_ht_map_probe(map, h, &snapshot, &priority,
candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
ck_ht_entry_key(entry),
ck_ht_entry_key_length(entry),
&probes);
} else {
candidate = ck_ht_map_probe(map, h, &snapshot, &priority,
candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
(void *)entry->key,
sizeof(entry->key),
&probes);
@ -661,12 +743,12 @@ ck_ht_put_spmc(ck_ht_t *table,
map = table->map;
if (table->mode == CK_HT_MODE_BYTESTRING) {
candidate = ck_ht_map_probe(map, h, &snapshot, &priority,
candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
ck_ht_entry_key(entry),
ck_ht_entry_key_length(entry),
&probes);
} else {
candidate = ck_ht_map_probe(map, h, &snapshot, &priority,
candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
(void *)entry->key,
sizeof(entry->key),
&probes);

Loading…
Cancel
Save