review: First round of review changes

The constants and macros in ck_cohort.h didn't conform to CK naming conventions.  Additionally, I was able to remove a lot of unnecessary atomic operations and memory fences, which reduced latency by 10-20% and increased throughput using ticket locks by nearly an order of magnitude.
ck_pring
Brendon Scheinman 12 years ago
parent e06b4a26ff
commit 44ea3a4688

@ -28,18 +28,19 @@
#ifndef _CK_COHORT_H
#define _CK_COHORT_H
#include <ck_cc.h>
#include <ck_pr.h>
#include <stdbool.h>
#include <stddef.h>
#define CK_COHORT_RELEASE_STATE_GLOBAL 0
#define CK_COHORT_RELEASE_STATE_LOCAL 1
#define RELEASE_STATE_GLOBAL 0
#define RELEASE_STATE_LOCAL 1
#define CK_COHORT_DEFAULT_LOCAL_PASS_LIMIT 10
#define DEFAULT_LOCAL_PASS_LIMIT 10
#define CK_COHORT_INSTANCE(N) ck_cohort_##N
#define CK_CREATE_COHORT(N, TG, TL) \
struct ck_cohort_##N { \
#define CK_COHORT_PROTOTYPE(N, TG, TL) \
struct CK_COHORT_INSTANCE(N) { \
TG *global_lock; \
TL *local_lock; \
unsigned int release_state; \
@ -54,77 +55,56 @@
{ \
ck_pr_store_ptr(&cohort->global_lock, global_lock); \
ck_pr_store_ptr(&cohort->local_lock, local_lock); \
ck_pr_store_uint(&cohort->release_state, RELEASE_STATE_GLOBAL); \
ck_pr_store_uint(&cohort->release_state, \
CK_COHORT_RELEASE_STATE_GLOBAL); \
ck_pr_store_uint(&cohort->waiting_threads, 0); \
ck_pr_store_uint(&cohort->acquire_count, 0); \
ck_pr_store_uint(&cohort->local_pass_limit, \
DEFAULT_LOCAL_PASS_LIMIT); \
CK_COHORT_DEFAULT_LOCAL_PASS_LIMIT); \
return; \
} \
\
CK_CC_INLINE static void \
ck_cohort_##N##_lock(struct ck_cohort_##N *cohort) \
{ \
unsigned int release_state; \
\
/* \
* Fence memory right after this increment to maximize the chance \
* that another releasing thread will hold onto the global lock \
* if possible. If the timing works out such that it relinquishes \
* the global lock when it doesn't have to, that's a potential \
* performance hit but it doesn't violate correctness. \
*/ \
ck_pr_inc_uint(&cohort->waiting_threads); \
ck_pr_fence_memory(); \
\
TL##_lock((TL *) ck_pr_load_ptr(&cohort->local_lock)); \
TL##_lock(cohort->local_lock); \
ck_pr_dec_uint(&cohort->waiting_threads); \
\
release_state = ck_pr_load_uint(&cohort->release_state); \
if (release_state == RELEASE_STATE_GLOBAL) { \
TG##_lock((TG *) ck_pr_load_ptr(&cohort->global_lock)); \
ck_pr_store_uint(&cohort->release_state, RELEASE_STATE_LOCAL); \
if (cohort->release_state == CK_COHORT_RELEASE_STATE_GLOBAL) { \
TG##_lock(cohort->global_lock); \
cohort->release_state = CK_COHORT_RELEASE_STATE_LOCAL; \
} \
\
/* \
* We can increment this count any time between now and when \
* we release the lock, but we may as well do it now because we're \
* about to fence memory anyway. \
*/ \
ck_pr_inc_uint(&cohort->acquire_count); \
\
++cohort->acquire_count; \
ck_pr_fence_memory(); \
return; \
} \
\
CK_CC_INLINE static bool \
ck_cohort_##N##_may_pass_local(struct ck_cohort_##N *cohort) \
{ \
return ck_pr_load_uint(&cohort->acquire_count) < \
ck_pr_load_uint(&cohort->local_pass_limit); \
} \
\
CK_CC_INLINE static void \
ck_cohort_##N##_unlock(struct ck_cohort_##N *cohort) \
{ \
if (ck_pr_load_uint(&cohort->waiting_threads) > 0 \
&& ck_cohort_##N##_may_pass_local(cohort)) { \
ck_pr_store_uint(&cohort->release_state, RELEASE_STATE_LOCAL); \
&& cohort->acquire_count < cohort->local_pass_limit) { \
cohort->release_state = CK_COHORT_RELEASE_STATE_LOCAL; \
} else { \
TG##_unlock((TG *) ck_pr_load_ptr(&cohort->global_lock)); \
ck_pr_store_uint(&cohort->release_state, RELEASE_STATE_GLOBAL); \
ck_pr_store_uint(&cohort->acquire_count, 0); \
TG##_unlock(cohort->global_lock); \
cohort->release_state = CK_COHORT_RELEASE_STATE_GLOBAL; \
cohort->acquire_count = 0; \
} \
\
TL##_unlock((TL *) ck_pr_load_ptr(&cohort->local_lock)); \
\
ck_pr_fence_memory(); \
TL##_unlock(cohort->local_lock); \
\
return; \
}
#define CK_COHORT_INITIALIZER\
{ NULL, NULL, RELEASE_STATE_GLOBAL, 0, 0, DEFAULT_LOCAL_PASS_LIMIT }
#define CK_COHORT_INITIALIZER \
{ .global_lock = NULL, .local_lock = NULL, \
.release_state = CK_COHORT_RELEASE_STATE_GLOBAL, \
.waiting_threads = 0, .acquire_count = 0, \
.local_pass_limit = CK_COHORT_DEFAULT_LOCAL_PASS_LIMIT }
#endif /* _CK_COHORT_H */

@ -58,13 +58,14 @@ static unsigned int barrier;
int critical __attribute__((aligned(64)));
typedef ck_spinlock_fas_t ck_spinlock_fas;
CK_CREATE_COHORT(fas_fas, ck_spinlock_fas, ck_spinlock_fas)
typedef ck_spinlock_ticket_t ck_spinlock_ticket;
CK_COHORT_PROTOTYPE(fas_fas, ck_spinlock_ticket, ck_spinlock_ticket)
static struct ck_cohort_fas_fas *cohorts;
static ck_spinlock_fas_t global_fas_lock = CK_SPINLOCK_FAS_INITIALIZER;
static ck_spinlock_ticket_t global_ticket_lock = CK_SPINLOCK_TICKET_INITIALIZER;
struct block {
unsigned int tid;
struct ck_cohort_fas_fas *cohort;
};
static void *
@ -74,13 +75,16 @@ fairness(void *null)
unsigned int i = context->tid;
volatile int j;
long int base;
struct ck_cohort_fas_fas *cohort = context->cohort;
unsigned int core;
struct ck_cohort_fas_fas *cohort;
if (aff_iterate(&a)) {
if (aff_iterate_core(&a, &core)) {
perror("ERROR: Could not affine thread");
exit(EXIT_FAILURE);
}
cohort = cohorts + (core / (int)(a.delta)) % n_cohorts;
while (ck_pr_load_uint(&ready) == 0);
ck_pr_inc_uint(&barrier);
@ -89,6 +93,7 @@ fairness(void *null)
while (ready) {
ck_cohort_fas_fas_lock(cohort);
fprintf(stderr, "lock acquired by thread %i\n", i);
count[i].value++;
if (critical) {
base = common_lrand48() % critical;
@ -108,7 +113,7 @@ main(int argc, char *argv[])
unsigned int i;
pthread_t *threads;
struct block *context;
ck_spinlock_fas_t *local_fas_locks;
ck_spinlock_ticket_t *local_fas_locks;
if (argc != 5) {
ck_error("Usage: ck_cohort <number of cohorts> <threads per cohort> "
@ -146,7 +151,7 @@ main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
local_fas_locks = calloc(n_cohorts, sizeof(ck_spinlock_fas_t));
local_fas_locks = calloc(n_cohorts, sizeof(ck_spinlock_ticket_t));
if (local_fas_locks == NULL) {
ck_error("ERROR: Could not allocate local lock structures\n");
exit(EXIT_FAILURE);
@ -170,14 +175,13 @@ main(int argc, char *argv[])
fprintf(stderr, "Creating cohorts...");
for (i = 0 ; i < n_cohorts ; i++) {
ck_cohort_fas_fas_init(cohorts + i, &global_fas_lock, local_fas_locks + i);
ck_cohort_fas_fas_init(cohorts + i, &global_ticket_lock, local_fas_locks + i);
}
fprintf(stderr, "done\n");
fprintf(stderr, "Creating threads (fairness)...");
for (i = 0; i < nthr; i++) {
context[i].tid = i;
context[i].cohort = cohorts + (i % n_cohorts);
if (pthread_create(&threads[i], NULL, fairness, context + i)) {
ck_error("ERROR: Could not create thread %d\n", i);
exit(EXIT_FAILURE);

@ -3,7 +3,7 @@
typedef ck_spinlock_fas_t ck_spinlock_fas;\
static ck_spinlock_fas global_fas_lock = CK_SPINLOCK_FAS_INITIALIZER;\
static ck_spinlock_fas local_fas_lock = CK_SPINLOCK_FAS_INITIALIZER;\
CK_CREATE_COHORT(fas_fas, ck_spinlock_fas, ck_spinlock_fas)\
CK_COHORT_PROTOTYPE(fas_fas, ck_spinlock_fas, ck_spinlock_fas)\
static struct ck_cohort_fas_fas CK_CC_CACHELINE cohort = CK_COHORT_INITIALIZER
#define LOCK_INIT ck_cohort_fas_fas_init(&cohort, &global_fas_lock, &local_fas_lock)
#define LOCK ck_cohort_fas_fas_lock(&cohort)

@ -45,7 +45,7 @@ static unsigned int locked;
static int nthr;
static ck_spinlock_fas_t global_fas_lock = CK_SPINLOCK_FAS_INITIALIZER;
typedef ck_spinlock_fas_t ck_spinlock_fas;
CK_CREATE_COHORT(fas_fas, ck_spinlock_fas, ck_spinlock_fas)
CK_COHORT_PROTOTYPE(fas_fas, ck_spinlock_fas, ck_spinlock_fas)
static struct ck_cohort_fas_fas *cohorts;
static int n_cohorts;
@ -62,7 +62,7 @@ thread(void *null CK_CC_UNUSED)
exit(EXIT_FAILURE);
}
cohort = cohorts + (core / (int)ck_pr_load_uint(&a.delta)) % n_cohorts;
cohort = cohorts + (core / (int)(a.delta)) % n_cohorts;
while (i--) {
ck_cohort_fas_fas_lock(cohort);

Loading…
Cancel
Save