ck_rwlock: Migrate to 32-bit primitives and add a write_latch operation.

write_latch allows writers to prevent incoming read acquisitions from
succeeding while permitting write_lock operations to complete. This is
useful if multiple writers are cooperating in linearizing a non-blocking
data structure and allows for lock state to be copied by the write lock
holder safely.

In collaboration with Paul Khuong and Jaidev Sridhar.
ck_pring
Samy Al Bahra 11 years ago
parent 671d067ea0
commit 210b724061

@ -29,16 +29,18 @@
#include <ck_elide.h>
#include <ck_pr.h>
#include <ck_stdint.h>
#include <stdbool.h>
#include <stddef.h>
struct ck_rwlock {
unsigned int writer;
unsigned int n_readers;
uint32_t writer;
uint32_t n_readers;
};
typedef struct ck_rwlock ck_rwlock_t;
#define CK_RWLOCK_INITIALIZER {0, 0}
#define CK_RWLOCK_LATCH_SHIFT 16
CK_CC_INLINE static void
ck_rwlock_init(struct ck_rwlock *rw)
@ -55,7 +57,7 @@ ck_rwlock_write_unlock(ck_rwlock_t *rw)
{
ck_pr_fence_release();
ck_pr_store_uint(&rw->writer, 0);
ck_pr_store_32(&rw->writer, 0);
return;
}
@ -64,14 +66,14 @@ ck_rwlock_locked_writer(ck_rwlock_t *rw)
{
ck_pr_fence_load();
return ck_pr_load_uint(&rw->writer);
return ck_pr_load_32(&rw->writer);
}
CK_CC_INLINE static void
ck_rwlock_write_downgrade(ck_rwlock_t *rw)
{
ck_pr_inc_uint(&rw->n_readers);
ck_pr_inc_32(&rw->n_readers);
ck_rwlock_write_unlock(rw);
return;
}
@ -82,22 +84,22 @@ ck_rwlock_locked(ck_rwlock_t *rw)
unsigned int r;
ck_pr_fence_load();
r = ck_pr_load_uint(&rw->writer);
r = ck_pr_load_32(&rw->writer);
ck_pr_fence_load();
return ck_pr_load_uint(&rw->n_readers) | r;
return ck_pr_load_32(&rw->n_readers) | r;
}
CK_CC_INLINE static bool
ck_rwlock_write_trylock(ck_rwlock_t *rw)
{
if (ck_pr_fas_uint(&rw->writer, 1) != 0)
if (ck_pr_fas_32(&rw->writer, 1) != 0)
return false;
ck_pr_fence_atomic_load();
if (ck_pr_load_uint(&rw->n_readers) != 0) {
if (ck_pr_load_32(&rw->n_readers) != 0) {
ck_rwlock_write_unlock(rw);
return false;
}
@ -108,16 +110,38 @@ ck_rwlock_write_trylock(ck_rwlock_t *rw)
CK_ELIDE_TRYLOCK_PROTOTYPE(ck_rwlock_write, ck_rwlock_t,
ck_rwlock_locked, ck_rwlock_write_trylock)
CK_CC_INLINE static void
ck_rwlock_write_latch(ck_rwlock_t *rw)
{
uint32_t snapshot = ck_pr_load_uint(&rw->n_readers);
uint32_t delta = snapshot + (1UL << CK_RWLOCK_LATCH_SHIFT);
while (ck_pr_cas_32_value(&rw->n_readers, snapshot, delta, &snapshot) == false) {
delta = snapshot + (1UL << CK_RWLOCK_LATCH_SHIFT);
ck_pr_stall();
}
return;
}
CK_CC_INLINE static void
ck_rwlock_write_unlatch(ck_rwlock_t *rw)
{
ck_pr_add_32(&rw->n_readers, 1UL << CK_RWLOCK_LATCH_SHIFT);
return;
}
CK_CC_INLINE static void
ck_rwlock_write_lock(ck_rwlock_t *rw)
{
while (ck_pr_fas_uint(&rw->writer, 1) != 0)
while (ck_pr_fas_32(&rw->writer, 1) != 0)
ck_pr_stall();
ck_pr_fence_atomic_load();
while (ck_pr_load_uint(&rw->n_readers) != 0)
while (ck_pr_load_32(&rw->n_readers) != 0)
ck_pr_stall();
return;
@ -131,10 +155,10 @@ CK_CC_INLINE static bool
ck_rwlock_read_trylock(ck_rwlock_t *rw)
{
if (ck_pr_load_uint(&rw->writer) != 0)
if (ck_pr_load_32(&rw->writer) != 0)
return false;
ck_pr_inc_uint(&rw->n_readers);
ck_pr_inc_32(&rw->n_readers);
/*
* Serialize with respect to concurrent write
@ -142,12 +166,12 @@ ck_rwlock_read_trylock(ck_rwlock_t *rw)
*/
ck_pr_fence_atomic_load();
if (ck_pr_load_uint(&rw->writer) == 0) {
if (ck_pr_load_32(&rw->writer) == 0) {
ck_pr_fence_load();
return true;
}
ck_pr_dec_uint(&rw->n_readers);
ck_pr_dec_32(&rw->n_readers);
return false;
}
@ -157,12 +181,21 @@ CK_ELIDE_TRYLOCK_PROTOTYPE(ck_rwlock_read, ck_rwlock_t,
CK_CC_INLINE static void
ck_rwlock_read_lock(ck_rwlock_t *rw)
{
uint32_t snapshot;
for (;;) {
while (ck_pr_load_uint(&rw->writer) != 0)
while (ck_pr_load_32(&rw->writer) != 0)
ck_pr_stall();
snapshot = ck_pr_faa_32(&rw->n_readers, 1);
if (snapshot >> CK_RWLOCK_LATCH_SHIFT) {
do {
ck_pr_stall();
snapshot = ck_pr_load_32(&rw->n_readers);
} while (snapshot >> CK_RWLOCK_LATCH_SHIFT);
ck_pr_inc_uint(&rw->n_readers);
continue;
}
/*
* Serialize with respect to concurrent write
@ -170,10 +203,10 @@ ck_rwlock_read_lock(ck_rwlock_t *rw)
*/
ck_pr_fence_atomic_load();
if (ck_pr_load_uint(&rw->writer) == 0)
if (ck_pr_load_32(&rw->writer) == 0)
break;
ck_pr_dec_uint(&rw->n_readers);
ck_pr_dec_32(&rw->n_readers);
}
/* Acquire semantics are necessary. */
@ -186,7 +219,7 @@ ck_rwlock_locked_reader(ck_rwlock_t *rw)
{
ck_pr_fence_load();
return ck_pr_load_uint(&rw->n_readers);
return ck_pr_load_32(&rw->n_readers);
}
CK_CC_INLINE static void
@ -194,7 +227,7 @@ ck_rwlock_read_unlock(ck_rwlock_t *rw)
{
ck_pr_fence_load_atomic();
ck_pr_dec_uint(&rw->n_readers);
ck_pr_dec_32(&rw->n_readers);
return;
}
@ -218,16 +251,16 @@ ck_rwlock_recursive_write_lock(ck_rwlock_recursive_t *rw, unsigned int tid)
{
unsigned int o;
o = ck_pr_load_uint(&rw->rw.writer);
o = ck_pr_load_32(&rw->rw.writer);
if (o == tid)
goto leave;
while (ck_pr_cas_uint(&rw->rw.writer, 0, tid) == false)
while (ck_pr_cas_32(&rw->rw.writer, 0, tid) == false)
ck_pr_stall();
ck_pr_fence_atomic_load();
while (ck_pr_load_uint(&rw->rw.n_readers) != 0)
while (ck_pr_load_32(&rw->rw.n_readers) != 0)
ck_pr_stall();
leave:
@ -240,17 +273,17 @@ ck_rwlock_recursive_write_trylock(ck_rwlock_recursive_t *rw, unsigned int tid)
{
unsigned int o;
o = ck_pr_load_uint(&rw->rw.writer);
o = ck_pr_load_32(&rw->rw.writer);
if (o == tid)
goto leave;
if (ck_pr_cas_uint(&rw->rw.writer, 0, tid) == false)
if (ck_pr_cas_32(&rw->rw.writer, 0, tid) == false)
return false;
ck_pr_fence_atomic_load();
if (ck_pr_load_uint(&rw->rw.n_readers) != 0) {
ck_pr_store_uint(&rw->rw.writer, 0);
if (ck_pr_load_32(&rw->rw.n_readers) != 0) {
ck_pr_store_32(&rw->rw.writer, 0);
return false;
}
@ -265,7 +298,7 @@ ck_rwlock_recursive_write_unlock(ck_rwlock_recursive_t *rw)
if (--rw->wc == 0) {
ck_pr_fence_release();
ck_pr_store_uint(&rw->rw.writer, 0);
ck_pr_store_32(&rw->rw.writer, 0);
}
return;
@ -294,5 +327,6 @@ ck_rwlock_recursive_read_unlock(ck_rwlock_recursive_t *rw)
return;
}
#undef CK_RWLOCK_LATCH_SHIFT
#endif /* _CK_RWLOCK_H */

Loading…
Cancel
Save