diff --git a/src/ck_ht.c b/src/ck_ht.c index 4c92452..77ec249 100644 --- a/src/ck_ht.c +++ b/src/ck_ht.c @@ -209,7 +209,8 @@ ck_ht_map_probe_wr(struct ck_ht_map *map, ck_ht_entry_t **available, const void *key, uint16_t key_length, - uint64_t *probe_limit) + uint64_t *probe_limit, + uint64_t *probe_wr) { struct ck_ht_entry *bucket, *cursor; struct ck_ht_entry *first = NULL; @@ -245,8 +246,10 @@ retry: * the case of intermittent writers. */ if (cursor->key == CK_HT_KEY_TOMBSTONE) { - if (first == NULL) + if (first == NULL) { first = cursor; + *probe_wr = probes; + } continue; } @@ -296,8 +299,6 @@ retry: offset = ck_ht_map_probe_next(map, offset, h, probes); } - return NULL; - leave: *probe_limit = probes; *available = first; @@ -559,7 +560,7 @@ ck_ht_remove_spmc(ck_ht_t *table, { struct ck_ht_map *map; struct ck_ht_entry *candidate, *priority, snapshot; - uint64_t probes; + uint64_t probes, probes_wr; map = table->map; @@ -567,12 +568,12 @@ ck_ht_remove_spmc(ck_ht_t *table, candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority, ck_ht_entry_key(entry), ck_ht_entry_key_length(entry), - &probes); + &probes, &probes_wr); } else { candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority, (void *)entry->key, sizeof(entry->key), - &probes); + &probes, &probes_wr); } /* No matching entry was found. */ @@ -657,7 +658,7 @@ ck_ht_set_spmc(ck_ht_t *table, { struct ck_ht_entry snapshot, *candidate, *priority; struct ck_ht_map *map; - uint64_t probes; + uint64_t probes, probes_wr; for (;;) { map = table->map; @@ -666,12 +667,17 @@ ck_ht_set_spmc(ck_ht_t *table, candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority, ck_ht_entry_key(entry), ck_ht_entry_key_length(entry), - &probes); + &probes, &probes_wr); } else { candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority, (void *)entry->key, sizeof(entry->key), - &probes); + &probes, &probes_wr); + } + + if (priority != NULL) { + probes = probes_wr; + break; } if (candidate != NULL) @@ -684,7 +690,12 @@ ck_ht_set_spmc(ck_ht_t *table, if (probes > map->probe_maximum) ck_pr_store_64(&map->probe_maximum, probes); - if (candidate->key != CK_HT_KEY_EMPTY && priority != NULL) { + if (candidate == NULL) { + candidate = priority; + } + + if (candidate->key != CK_HT_KEY_EMPTY && + priority != NULL && candidate != priority) { /* * If we are replacing an existing entry and an earlier * tombstone was found in the probe sequence then replace @@ -708,7 +719,10 @@ ck_ht_set_spmc(ck_ht_t *table, } else { /* * In this case we are inserting a new entry or replacing - * an existing entry. + * an existing entry. There is no need to force a re-probe + * on tombstone replacement due to the fact that previous + * deletion counter update would have been published with + * respect to any concurrent probes. */ bool replace = candidate->key != CK_HT_KEY_EMPTY; @@ -750,7 +764,7 @@ ck_ht_put_spmc(ck_ht_t *table, { struct ck_ht_entry snapshot, *candidate, *priority; struct ck_ht_map *map; - uint64_t probes; + uint64_t probes, probes_wr; for (;;) { map = table->map; @@ -759,39 +773,38 @@ ck_ht_put_spmc(ck_ht_t *table, candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority, ck_ht_entry_key(entry), ck_ht_entry_key_length(entry), - &probes); + &probes, &probes_wr); } else { candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority, (void *)entry->key, sizeof(entry->key), - &probes); + &probes, &probes_wr); } - if (candidate != NULL) + if (candidate != NULL || priority != NULL) break; if (ck_ht_grow_spmc(table, map->capacity << 1) == false) return false; } - /* - * If the snapshot key is non-empty and the value field is not - * a tombstone then an identical key was found. As store does - * not implement replacement, we will fail. - */ - if (candidate->key != CK_HT_KEY_EMPTY && candidate->key != CK_HT_KEY_TOMBSTONE) + if (priority != NULL) { + /* Re-use tombstone if one was found. */ + candidate = priority; + probes = probes_wr; + } else if (candidate->key != CK_HT_KEY_EMPTY && + candidate->key != CK_HT_KEY_TOMBSTONE) { + /* + * If the snapshot key is non-empty and the value field is not + * a tombstone then an identical key was found. As store does + * not implement replacement, we will fail. + */ return false; + } if (probes > map->probe_maximum) ck_pr_store_64(&map->probe_maximum, probes); - /* - * If an earlier tombstone value was found, then store into that slot instead. - * It is earlier in the probe sequence to begin with. - */ - if (priority != NULL) - candidate = priority; - #ifdef CK_HT_PP ck_pr_store_ptr(&candidate->value, (void *)entry->value); ck_pr_fence_store();