From f88d03b846f6e12caa5626862ddacb56f09a0a80 Mon Sep 17 00:00:00 2001 From: Samy Al Bahra Date: Wed, 7 Aug 2019 11:42:59 -0400 Subject: [PATCH] ck_ring: add a ck_ring_seek_* family of functions. These functions will forward consumer counter without paying cost of copying data out of ring buffer. --- include/ck_ring.h | 99 ++++++++++++++++++++- regressions/ck_ring/validate/ck_ring_mpmc.c | 31 ++++--- regressions/ck_ring/validate/ck_ring_spmc.c | 2 +- 3 files changed, 116 insertions(+), 16 deletions(-) diff --git a/include/ck_ring.h b/include/ck_ring.h index e5f0712..d019ecb 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -1,5 +1,5 @@ /* - * Copyright 2009-2015 Samy Al Bahra. + * Copyright 2009-2019 Samy Al Bahra. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -130,6 +130,21 @@ _ck_ring_enqueue_sp_size(struct ck_ring *ring, return r; } +CK_CC_FORCE_INLINE static bool +_ck_ring_seek_sc(struct ck_ring *ring) +{ + unsigned int consumer, producer; + + consumer = ring->c_head; + producer = ck_pr_load_uint(&ring->p_tail); + + if (CK_CC_UNLIKELY(consumer == producer)) + return false; + + ck_pr_store_uint(&ring->c_head, consumer + 1); + return true; +} + CK_CC_FORCE_INLINE static bool _ck_ring_dequeue_sc(struct ck_ring *ring, const void *CK_CC_RESTRICT buffer, @@ -264,6 +279,46 @@ _ck_ring_enqueue_mp_size(struct ck_ring *ring, return r; } +CK_CC_FORCE_INLINE static bool +_ck_ring_tryseek_mc(struct ck_ring *ring) +{ + unsigned int consumer, producer; + + consumer = ck_pr_load_uint(&ring->c_head); + ck_pr_fence_load(); + producer = ck_pr_load_uint(&ring->p_tail); + + if (CK_CC_UNLIKELY(consumer == producer)) + return false; + + return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1); +} + +CK_CC_FORCE_INLINE static bool +_ck_ring_seek_mc(struct ck_ring *ring) +{ + unsigned int consumer, producer; + + consumer = ck_pr_load_uint(&ring->c_head); + + do { + /* + * Producer counter must represent state relative to + * our latest consumer snapshot. + */ + ck_pr_fence_load(); + producer = ck_pr_load_uint(&ring->p_tail); + + if (CK_CC_UNLIKELY(consumer == producer)) + return false; + } while (ck_pr_cas_uint_value(&ring->c_head, + consumer, + consumer + 1, + &consumer) == false); + + return true; +} + CK_CC_FORCE_INLINE static bool _ck_ring_trydequeue_mc(struct ck_ring *ring, const void *buffer, @@ -354,6 +409,13 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, &entry, sizeof(entry), NULL); } +CK_CC_INLINE static bool +ck_ring_seek_spsc(struct ck_ring *ring) +{ + + return _ck_ring_seek_sc(ring); +} + CK_CC_INLINE static bool ck_ring_dequeue_spsc(struct ck_ring *ring, const struct ck_ring_buffer *buffer, @@ -410,6 +472,20 @@ ck_ring_dequeue_mpmc(struct ck_ring *ring, sizeof(void *)); } +CK_CC_INLINE static bool +ck_ring_tryseek_mpmc(struct ck_ring *ring) +{ + + return _ck_ring_tryseek_mc(ring); +} + +CK_CC_INLINE static bool +ck_ring_seek_mpmc(struct ck_ring *ring) +{ + + return _ck_ring_seek_mc(ring); +} + /* * The ck_ring_*_spmc namespace is the public interface for interacting with a * ring buffer containing pointers. Correctness is provided for any number of @@ -454,6 +530,20 @@ ck_ring_dequeue_spmc(struct ck_ring *ring, return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *)); } +CK_CC_INLINE static bool +ck_ring_tryseek_spmc(struct ck_ring *ring) +{ + + return _ck_ring_tryseek_mc(ring); +} + +CK_CC_INLINE static bool +ck_ring_seek_spmc(struct ck_ring *ring) +{ + + return _ck_ring_seek_mc(ring); +} + /* * The ck_ring_*_mpsc namespace is the public interface for interacting with a * ring buffer containing pointers. Correctness is provided for any number of @@ -480,6 +570,13 @@ ck_ring_enqueue_mpsc_size(struct ck_ring *ring, sizeof(entry), size); } +CK_CC_INLINE static bool +ck_ring_seek_mpsc(struct ck_ring *ring) +{ + + return _ck_ring_seek_sc(ring); +} + CK_CC_INLINE static bool ck_ring_dequeue_mpsc(struct ck_ring *ring, const struct ck_ring_buffer *buffer, diff --git a/regressions/ck_ring/validate/ck_ring_mpmc.c b/regressions/ck_ring/validate/ck_ring_mpmc.c index 66d7f39..dcc3a60 100644 --- a/regressions/ck_ring/validate/ck_ring_mpmc.c +++ b/regressions/ck_ring/validate/ck_ring_mpmc.c @@ -102,25 +102,28 @@ test_mpmc(void *c) if (ck_ring_dequeue_mpmc(&ring_mw, buffer, &o) == false) o = NULL; } else { + ck_ring_seek_mpmc(&ring_mw); if (ck_ring_trydequeue_mpmc(&ring_mw, buffer, &o) == false) o = NULL; } if (o == NULL) { - o = malloc(sizeof(*o)); - if (o == NULL) - continue; - - o->value_long = (unsigned long)ck_pr_faa_uint(&global_counter, 1) + 1; - - o->magic = 0xdead; - o->ref = 0; - o->tid = tid; - - if (ck_ring_enqueue_mpmc(&ring_mw, buffer, o) == false) { - free(o); - } else { - enqueue++; + for (size_t z = 0; z < 2; z++) { + o = malloc(sizeof(*o)); + if (o == NULL) + continue; + + o->value_long = (unsigned long)ck_pr_faa_uint(&global_counter, 1) + 1; + + o->magic = 0xdead; + o->ref = 0; + o->tid = tid; + + if (ck_ring_enqueue_mpmc(&ring_mw, buffer, o) == false) { + free(o); + } else { + enqueue++; + } } continue; diff --git a/regressions/ck_ring/validate/ck_ring_spmc.c b/regressions/ck_ring/validate/ck_ring_spmc.c index 161c0d8..e600b53 100644 --- a/regressions/ck_ring/validate/ck_ring_spmc.c +++ b/regressions/ck_ring/validate/ck_ring_spmc.c @@ -327,7 +327,7 @@ main(int argc, char *argv[]) } if ((int)s >= (size * ITERATIONS * (nthr - 1))) { - ck_error("MPMC: Unexpected size of %u\n", s); + ck_error("SPMC: Unexpected size of %u\n", s); } } }