ck_ring: add a ck_ring_seek_* family of functions.

These functions will forward consumer counter without paying cost of
copying data out of ring buffer.
awsm
Samy Al Bahra 6 years ago
parent 7be41960ab
commit f88d03b846

@ -1,5 +1,5 @@
/*
* Copyright 2009-2015 Samy Al Bahra.
* Copyright 2009-2019 Samy Al Bahra.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -130,6 +130,21 @@ _ck_ring_enqueue_sp_size(struct ck_ring *ring,
return r;
}
CK_CC_FORCE_INLINE static bool
_ck_ring_seek_sc(struct ck_ring *ring)
{
unsigned int consumer, producer;
consumer = ring->c_head;
producer = ck_pr_load_uint(&ring->p_tail);
if (CK_CC_UNLIKELY(consumer == producer))
return false;
ck_pr_store_uint(&ring->c_head, consumer + 1);
return true;
}
CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_sc(struct ck_ring *ring,
const void *CK_CC_RESTRICT buffer,
@ -264,6 +279,46 @@ _ck_ring_enqueue_mp_size(struct ck_ring *ring,
return r;
}
CK_CC_FORCE_INLINE static bool
_ck_ring_tryseek_mc(struct ck_ring *ring)
{
unsigned int consumer, producer;
consumer = ck_pr_load_uint(&ring->c_head);
ck_pr_fence_load();
producer = ck_pr_load_uint(&ring->p_tail);
if (CK_CC_UNLIKELY(consumer == producer))
return false;
return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
}
CK_CC_FORCE_INLINE static bool
_ck_ring_seek_mc(struct ck_ring *ring)
{
unsigned int consumer, producer;
consumer = ck_pr_load_uint(&ring->c_head);
do {
/*
* Producer counter must represent state relative to
* our latest consumer snapshot.
*/
ck_pr_fence_load();
producer = ck_pr_load_uint(&ring->p_tail);
if (CK_CC_UNLIKELY(consumer == producer))
return false;
} while (ck_pr_cas_uint_value(&ring->c_head,
consumer,
consumer + 1,
&consumer) == false);
return true;
}
CK_CC_FORCE_INLINE static bool
_ck_ring_trydequeue_mc(struct ck_ring *ring,
const void *buffer,
@ -354,6 +409,13 @@ ck_ring_enqueue_spsc(struct ck_ring *ring,
&entry, sizeof(entry), NULL);
}
CK_CC_INLINE static bool
ck_ring_seek_spsc(struct ck_ring *ring)
{
return _ck_ring_seek_sc(ring);
}
CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,
@ -410,6 +472,20 @@ ck_ring_dequeue_mpmc(struct ck_ring *ring,
sizeof(void *));
}
CK_CC_INLINE static bool
ck_ring_tryseek_mpmc(struct ck_ring *ring)
{
return _ck_ring_tryseek_mc(ring);
}
CK_CC_INLINE static bool
ck_ring_seek_mpmc(struct ck_ring *ring)
{
return _ck_ring_seek_mc(ring);
}
/*
* The ck_ring_*_spmc namespace is the public interface for interacting with a
* ring buffer containing pointers. Correctness is provided for any number of
@ -454,6 +530,20 @@ ck_ring_dequeue_spmc(struct ck_ring *ring,
return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
}
CK_CC_INLINE static bool
ck_ring_tryseek_spmc(struct ck_ring *ring)
{
return _ck_ring_tryseek_mc(ring);
}
CK_CC_INLINE static bool
ck_ring_seek_spmc(struct ck_ring *ring)
{
return _ck_ring_seek_mc(ring);
}
/*
* The ck_ring_*_mpsc namespace is the public interface for interacting with a
* ring buffer containing pointers. Correctness is provided for any number of
@ -480,6 +570,13 @@ ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
sizeof(entry), size);
}
CK_CC_INLINE static bool
ck_ring_seek_mpsc(struct ck_ring *ring)
{
return _ck_ring_seek_sc(ring);
}
CK_CC_INLINE static bool
ck_ring_dequeue_mpsc(struct ck_ring *ring,
const struct ck_ring_buffer *buffer,

@ -102,25 +102,28 @@ test_mpmc(void *c)
if (ck_ring_dequeue_mpmc(&ring_mw, buffer, &o) == false)
o = NULL;
} else {
ck_ring_seek_mpmc(&ring_mw);
if (ck_ring_trydequeue_mpmc(&ring_mw, buffer, &o) == false)
o = NULL;
}
if (o == NULL) {
o = malloc(sizeof(*o));
if (o == NULL)
continue;
o->value_long = (unsigned long)ck_pr_faa_uint(&global_counter, 1) + 1;
o->magic = 0xdead;
o->ref = 0;
o->tid = tid;
if (ck_ring_enqueue_mpmc(&ring_mw, buffer, o) == false) {
free(o);
} else {
enqueue++;
for (size_t z = 0; z < 2; z++) {
o = malloc(sizeof(*o));
if (o == NULL)
continue;
o->value_long = (unsigned long)ck_pr_faa_uint(&global_counter, 1) + 1;
o->magic = 0xdead;
o->ref = 0;
o->tid = tid;
if (ck_ring_enqueue_mpmc(&ring_mw, buffer, o) == false) {
free(o);
} else {
enqueue++;
}
}
continue;

@ -327,7 +327,7 @@ main(int argc, char *argv[])
}
if ((int)s >= (size * ITERATIONS * (nthr - 1))) {
ck_error("MPMC: Unexpected size of %u\n", s);
ck_error("SPMC: Unexpected size of %u\n", s);
}
}
}

Loading…
Cancel
Save