diff --git a/include/ck_ring.h b/include/ck_ring.h index d03240a..b96d354 100644 --- a/include/ck_ring.h +++ b/include/ck_ring.h @@ -53,6 +53,7 @@ struct type *buffer, \ unsigned int size) \ { \ + \ ck_pr_store_uint(&ring->size, size); \ ck_pr_store_uint(&ring->mask, size - 1); \ ck_pr_store_uint(&ring->p_tail, 0); \ @@ -82,13 +83,13 @@ unsigned int consumer, producer; \ \ consumer = ck_pr_load_uint(&ring->c_head); \ - producer = ck_pr_load_uint(&ring->p_tail); \ + producer = ring->p_tail; \ \ if (((producer + 1) & ring->mask) == consumer) \ return (false); \ \ ring->ring[producer] = *entry; \ - ck_pr_fence_store(); \ + ck_pr_fence_memory(); \ ck_pr_store_uint(&ring->p_tail, (producer + 1) & ring->mask); \ \ return (true); \ @@ -99,7 +100,7 @@ { \ unsigned int consumer, producer; \ \ - consumer = ck_pr_load_uint(&ring->c_head); \ + consumer = ring->c_head; \ producer = ck_pr_load_uint(&ring->p_tail); \ \ if (consumer == producer) \ @@ -154,7 +155,7 @@ CK_CC_INLINE static unsigned int ck_ring_capacity(struct ck_ring *ring) { - return ck_pr_load_uint(&ring->size); + return ring->size; } CK_CC_INLINE static bool @@ -163,15 +164,19 @@ ck_ring_enqueue_spsc(struct ck_ring *ring, void *entry) unsigned int consumer, producer; consumer = ck_pr_load_uint(&ring->c_head); - producer = ck_pr_load_uint(&ring->p_tail); + producer = ring->p_tail; if (((producer + 1) & ring->mask) == consumer) return (false); - ck_pr_store_ptr(&ring->ring[producer], entry); + ring->ring[producer] = entry; + + /* + * Make sure to update slot value before indicating + * that the slot is available for consumption. + */ ck_pr_fence_store(); ck_pr_store_uint(&ring->p_tail, (producer + 1) & ring->mask); - return (true); } @@ -182,19 +187,33 @@ CK_CC_INLINE static bool ck_ring_dequeue_spsc(struct ck_ring *ring, void *data) { unsigned int consumer, producer; - void *value; - consumer = ck_pr_load_uint(&ring->c_head); + consumer = ring->c_head; producer = ck_pr_load_uint(&ring->p_tail); if (consumer == producer) return (false); + /* + * Make sure to serialize with respect to our snapshot + * of the producer counter. + */ ck_pr_fence_load(); - value = ck_pr_load_ptr(ring->ring + consumer); - ck_pr_store_ptr(data, value); - ck_pr_store_uint(&ring->c_head, (consumer + 1) & ring->mask); + /* + * This is used to work-around aliasing issues (C + * lacks a generic pointer to pointer despite it + * being a reality on POSIX). This interface is + * troublesome on platforms where sizeof(void *) + * is not guaranteed to be sizeof(T *). + * + * It is guaranteed that the value of + * ring->ring[consumer] is either in *data or aliased + * to some location whose value is to be stored in data + * before this operation completes. + */ + ck_pr_store_ptr(data, ring->ring[consumer]); + ck_pr_store_uint(&ring->c_head, (consumer + 1) & ring->mask); return (true); }