diff --git a/include/ck_ring.h b/include/ck_ring.h index 71aa66b..a9dc373 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -159,6 +159,66 @@ ck_ring_capacity(struct ck_ring *ring) return ring->size; } +CK_CC_INLINE static bool +ck_ring_enqueue_mpmc(struct ck_ring *ring, void *entry) +{ + unsigned int consumer, producer, delta; + bool success; + void *r; + + producer = ck_pr_load_uint(&ring->p_tail); + + do { + consumer = ck_pr_load_uint(&ring->c_head); + delta = (producer + 1) & ring->mask; + if (delta == consumer) + return false; + + /* Speculate slot availability. */ + r = ck_pr_load_ptr(&ring->ring[producer]); + success = ck_pr_cas_ptr(&ring->ring[producer], r, entry); + + /* Publish value before publishing counter update. */ + ck_pr_fence_store(); + + /* This is the linearization point. */ + ck_pr_cas_uint_value(&ring->p_tail, + producer, + delta, + &producer); + } while (success == false); + + return true; +} + +CK_CC_INLINE static bool +ck_ring_dequeue_mpmc(struct ck_ring *ring, void *data) +{ + unsigned int consumer, producer; + void *r; + + consumer = ck_pr_load_uint(&ring->c_head); + + do { + producer = ck_pr_load_uint(&ring->p_tail); + + if (consumer == producer) + return false; + + ck_pr_fence_load(); + r = ck_pr_load_ptr(&ring->ring[consumer]); + + /* Serialize load with respect to head update. */ + ck_pr_fence_memory(); + } while (ck_pr_cas_uint_value(&ring->c_head, + consumer, + (consumer + 1) & ring->mask, + &consumer) == false); + + ck_pr_store_ptr(data, r); + return true; +} + CK_CC_INLINE static bool ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry) { diff --git a/regressions/ck_ring/validate/Makefile b/regressions/ck_ring/validate/Makefile index 1319556..787625a 100644 --- a/regressions/ck_ring/validate/Makefile +++ b/regressions/ck_ring/validate/Makefile @@ -1,12 +1,13 @@ .PHONY: check clean distribution -OBJECTS=ck_ring_spsc ck_ring_spsc_template +OBJECTS=ck_ring_spsc ck_ring_spsc_template ck_ring_mpmc all: $(OBJECTS) check: all ./ck_ring_spsc $(CORES) 1 65536 ./ck_ring_spsc_template $(CORES) 1 65536 + ./ck_ring_mpmc $(CORES) 1 65536 ck_ring_spsc_template: ck_ring_spsc_template.c ../../../include/ck_ring.h $(CC) $(CFLAGS) -o ck_ring_spsc_template ck_ring_spsc_template.c @@ -14,6 +15,9 @@ ck_ring_spsc_template: ck_ring_spsc_template.c ../../../include/ck_ring.h ck_ring_spsc: ck_ring_spsc.c ../../../include/ck_ring.h $(CC) $(CFLAGS) -o ck_ring_spsc ck_ring_spsc.c +ck_ring_mpmc: ck_ring_mpmc.c ../../../include/ck_ring.h + $(CC) $(CFLAGS) -o ck_ring_mpmc ck_ring_mpmc.c + clean: rm -rf *~ *.o $(OBJECTS) *.dSYM