Work-stealing deque for global request queue

* all cores steal from that queue.
* other bug fixes
* tested with NCORES = 2 and 4.
main
phani 5 years ago
parent 2f3ddcb9c5
commit eb8fcf00c4

@ -18,6 +18,12 @@ CFLAGS += -D_GNU_SOURCE
CFLAGS += -DUSE_UVIO
#CFLAGS += -DUSE_SYSCALL
#CFLAGS += -DPREEMPT_DISABLE
CACHE_LINESIZE := $(shell getconf LEVEL1_DCACHE_LINESIZE)
NCORES_CONF := $(shell getconf _NPROCESSORS_CONF)
#todo:cross-compile
CFLAGS += -DCACHELINE_SIZE=${CACHE_LINESIZE}
#CFLAGS += -DNCORES=${NCORES_CONF}
CFLAGS += -DNCORES=4
MAKE= make --no-print-directory
@ -43,6 +49,7 @@ tools:
@${MAKE} -C tools
clean:
@rm -f core
@echo "Cleaning up runtime"
@rm -f ${RUNTIME}
# @echo "Cleaning up tools"

@ -28,6 +28,7 @@
typedef uint64_t reg_t;
#define ARCH_NREGS (16 /* GP registers */ + 1 /* for IP */)
#define ARCH_SIG_JMP_OFF 8
/*
* This is the slowpath switch to a preempted sandbox!
@ -49,11 +50,10 @@ arch_mcontext_save(arch_context_t *ctx, mcontext_t *mc)
assert(ctx != &base_context);
ctx->regs[5] = 0;
memset(ctx->regs, 0, ARCH_NREGS * sizeof(reg_t));
memcpy(&ctx->mctx, mc, sizeof(mcontext_t));
}
static void
static int
arch_mcontext_restore(mcontext_t *mc, arch_context_t *ctx)
{
assert(ctx != &base_context);
@ -62,12 +62,16 @@ arch_mcontext_restore(mcontext_t *mc, arch_context_t *ctx)
// else restore mcontext..
if (ctx->regs[5]) {
mc->gregs[REG_RSP] = ctx->regs[5];
mc->gregs[REG_RIP] = ctx->regs[16];
mc->gregs[REG_RIP] = ctx->regs[16] + ARCH_SIG_JMP_OFF;
ctx->regs[5] = 0;
return 1;
} else {
memcpy(mc, &ctx->mctx, sizeof(mcontext_t));
memset(&ctx->mctx, 0, sizeof(mcontext_t));
}
return 0;
}
static void __attribute__((noinline))

@ -0,0 +1,113 @@
/**
* Redistribution of this file is permitted under the BSD two clause license.
*
* Copyright 2019, The George Washington University
* Author: Phani Gadepalli, phanikishoreg@gwu.edu
*/
#ifndef DEQUE_H
#define DEQUE_H
/*
* This was implemented by referring to:
* https://github.com/cpp-taskflow/cpp-taskflow/blob/9c28ccec910346a9937c40db7bdb542262053f9c/taskflow/executor/workstealing.hpp
*
* which is based on the following papers:
*
* The work stealing queue described in the paper, "Dynamic Circular Work-stealing Deque," SPAA, 2015.
* Only the queue owner can perform pop and push operations, while others can steal data from the queue.
*
* PPoPP implementation paper, "Correct and Efficient Work-Stealing for Weak Memory Models"
* https://www.di.ens.fr/~zappa/readings/ppopp13.pdf
*/
//TODO: dynamic resize!
#define DEQUE_MAX_SZ (1<<23)
#define DEQUE_PROTOTYPE(name, type) \
struct deque_##name { \
type wrk[DEQUE_MAX_SZ]; \
long size; \
\
volatile long top; \
volatile long bottom; \
}; \
\
static inline void \
deque_init_##name(struct deque_##name *q, size_t sz) \
{ \
memset(q, 0, sizeof(struct deque_##name)); \
\
if (sz) { \
/* only for size with pow of 2 */ \
/* assert((sz & (sz - 1)) == 0); */ \
assert(sz <= DEQUE_MAX_SZ); \
} else { \
sz = DEQUE_MAX_SZ; \
} \
\
q->size = sz; \
} \
\
/* Use mutual exclusion locks around push/pop if multi-threaded. */ \
static inline int \
deque_push_##name(struct deque_##name *q, type *w) \
{ \
long ct, cb; \
\
ct = q->top; \
cb = q->bottom; \
\
/* nope, fixed size only */ \
if (q->size - 1 < (cb - ct)) return -ENOSPC; \
\
q->wrk[cb] = *w; \
__sync_synchronize(); \
if (__sync_bool_compare_and_swap(&q->bottom, cb, cb + 1) == false) assert(0); \
\
return 0; \
} \
\
/* Use mutual exclusion locks around push/pop if multi-threaded. */ \
static inline int \
deque_pop_##name(struct deque_##name *q, type *w) \
{ \
long ct = 0, sz = 0; \
long cb = q->bottom - 1; \
int ret = 0; \
\
if (__sync_bool_compare_and_swap(&q->bottom, cb + 1, cb) == false) assert(0); \
\
ct = q->top; \
sz = cb - ct; \
if (sz < 0) { \
if (__sync_bool_compare_and_swap(&q->bottom, cb, ct) == false) assert(0); \
\
return -ENOENT; \
} \
\
*w = q->wrk[cb]; \
if (sz > 0) return 0; \
\
ret = __sync_bool_compare_and_swap(&q->top, ct, ct + 1); \
if (__sync_bool_compare_and_swap(&q->bottom, cb, ct + 1) == false) assert(0); \
if (ret == false) { *w = NULL; return -ENOENT; } \
\
return 0; \
} \
\
static inline int \
deque_steal_##name(struct deque_##name *q, type *w) \
{ \
long ct, cb; \
\
ct = q->top; \
cb = q->bottom; \
\
if (ct >= cb) return -ENOENT; \
\
*w = q->wrk[ct]; \
if (__sync_bool_compare_and_swap(&q->top, ct, ct + 1) == false) return -EAGAIN; \
\
return 0; \
}
#endif /* DEQUE_H */

@ -5,6 +5,10 @@
#include <uv.h>
#include "sandbox.h"
// global queue for stealing (work-stealing-deque)
extern struct deque_sandbox *glb_dq;
extern pthread_mutex_t glbq_mtx;
void alloc_linear_memory(void);
void expand_memory(void);
void free_linear_memory(void *base, u32 bound, u32 max);

@ -7,6 +7,7 @@
#include "softint.h"
#include <ucontext.h>
#include <uv.h>
#include "deque.h"
struct io_handle {
int fd;
@ -63,7 +64,9 @@ struct sandbox {
struct ps_list list;
// track I/O handles?
};
} CACHE_ALIGNED;
DEQUE_PROTOTYPE(sandbox, struct sandbox *);
// a runtime resource, malloc on this!
struct sandbox *sandbox_alloc(struct module *mod, char *args);
@ -83,13 +86,6 @@ sandbox_current(void)
static inline void
sandbox_current_set(struct sandbox *sbox)
{
int dis = 0;
if (softint_enabled()) {
dis = 1;
softint_disable();
}
// FIXME: critical-section.
current_sandbox = sbox;
if (sbox == NULL) return;
@ -98,8 +94,6 @@ sandbox_current_set(struct sandbox *sbox)
sandbox_lmbound = sbox->linear_size;
// TODO: module table or sandbox table?
module_indirect_table = sbox->mod->indirect_table;
if (dis) softint_enable();
}
static inline struct module *
@ -149,6 +143,56 @@ void sandbox_wakeup(sandbox_t *sb);
// should have been called with stack allocated and sandbox_current() set!
void sandbox_entry(void);
void sandbox_exit(void);
extern struct deque_sandbox *glb_dq;
extern pthread_mutex_t glbq_mtx;
static inline int
sandbox_deque_push(struct sandbox *s)
{
int ret;
#if NCORES == 1
pthread_mutex_lock(&glbq_mtx);
#endif
ret = deque_push_sandbox(glb_dq, &s);
#if NCORES == 1
pthread_mutex_unlock(&glbq_mtx);
#endif
return ret;
}
static inline int
sandbox_deque_pop(struct sandbox **s)
{
int ret;
#if NCORES == 1
pthread_mutex_lock(&glbq_mtx);
#endif
ret = deque_pop_sandbox(glb_dq, s);
#if NCORES == 1
pthread_mutex_unlock(&glbq_mtx);
#endif
return ret;
}
static inline struct sandbox *
sandbox_deque_steal(void)
{
struct sandbox *s = NULL;
#if NCORES == 1
sandbox_deque_pop(&s);
#else
// TODO: check! is there a sandboxing thread on same core as udp-server thread?
int r = deque_steal_sandbox(glb_dq, &s);
if (r) s = NULL;
#endif
return s;
}
static inline int
io_handle_preopen(void)

@ -23,6 +23,11 @@
#define INLINE __attribute__((always_inline))
#define WEAK __attribute__((weak))
#ifndef CACHELINE_SIZE
#define CACHELINE_SIZE 32
#endif
#define CACHE_ALIGNED __attribute__((aligned(CACHELINE_SIZE)))
// Type alias's so I don't have to write uint32_t a million times
typedef signed char i8;
typedef unsigned char u8;
@ -102,7 +107,7 @@ typedef enum {
#define SBOX_MAX_OPEN 32
#define SBOX_PREOPEN_MAGIC (707707707) // reads lol lol lol upside down
#define SOFTINT_TIMER_START_USEC (10*1000) //start timers 10 us from now.
#define SOFTINT_TIMER_START_USEC (10*1000) //start timers 10 ms from now.
#define SOFTINT_TIMER_PERIOD_USEC (1000*100) // 100ms timer..
#ifdef DEBUG
@ -124,6 +129,7 @@ typedef enum {
#define RDWR_VEC_MAX 16
#define MOD_REQ_CORE 0 // core dedicated to check module requests..
#define SBOX_NCORES 2 // number of sandboxing threads/cores..
#define SBOX_NCORES (NCORES > 1 ? NCORES - 1 : NCORES) // number of sandboxing threads
#define SBOX_MAX_REQS (1<<19) //random!
#endif /* SFRT_TYPES_H */

@ -16,6 +16,8 @@ i32 logfd = -1;
u32 ncores = 0, sbox_ncores = 0, sbox_core_st = 0;
pthread_t rtthd[SBOX_NCORES];
static void
usage(char *cmd)
{
@ -28,7 +30,7 @@ main(int argc, char* argv[])
{
#ifndef STANDALONE
int i = 0, rtthd_ret[SBOX_NCORES] = { 0 };
pthread_t rtthd[SBOX_NCORES];
memset(rtthd, 0, sizeof(pthread_t)*SBOX_NCORES);
if (argc != 2) {
usage(argv[0]);

@ -9,9 +9,8 @@
#include <softint.h>
#include <uv.h>
// global queue for stealing! TODO: work-stealing-deque
static struct ps_list_head glbq;
static pthread_mutex_t glbq_mtx = PTHREAD_MUTEX_INITIALIZER;
struct deque_sandbox *glb_dq;
pthread_mutex_t glbq_mtx = PTHREAD_MUTEX_INITIALIZER;
// per-thread (per-core) run and wait queues.. (using doubly-linked-lists)
__thread static struct ps_list_head runq;
@ -33,7 +32,7 @@ static inline void
sandbox_local_run(struct sandbox *s)
{
assert(ps_list_singleton_d(s));
fprintf(stderr, "(%d,%lu) %s: run %p, %s\n", sched_getcpu(), pthread_self(), __func__, s, s->mod->name);
ps_list_head_append_d(&runq, s);
}
@ -43,19 +42,11 @@ sandbox_pull(void)
int n = 0;
while (n < SBOX_PULL_MAX) {
pthread_mutex_lock(&glbq_mtx);
if (ps_list_head_empty(&glbq)) {
pthread_mutex_unlock(&glbq_mtx);
break;
}
struct sandbox *g = ps_list_head_first_d(&glbq, struct sandbox);
assert(g);
ps_list_rem_d(g);
pthread_mutex_unlock(&glbq_mtx);
debuglog("[%p: %s]\n", g, g->mod->name);
assert(g->state == SANDBOX_RUNNABLE);
sandbox_local_run(g);
struct sandbox *s = sandbox_deque_steal();
if (!s) break;
assert(s->state == SANDBOX_RUNNABLE);
sandbox_local_run(s);
n++;
}
@ -83,8 +74,6 @@ sandbox_io_nowait(void)
struct sandbox *
sandbox_schedule(void)
{
if (!in_callback) sandbox_io_nowait();
struct sandbox *s = NULL;
if (ps_list_head_empty(&runq)) {
if (sandbox_pull() == 0) {
@ -103,15 +92,32 @@ sandbox_schedule(void)
return s;
}
struct sandbox *
sandbox_schedule_uvio(void)
{
if (!in_callback) sandbox_io_nowait();
assert(sandbox_current() == NULL);
softint_disable();
struct sandbox *s = sandbox_schedule();
softint_enable();
assert(s == NULL || s->state == SANDBOX_RUNNABLE);
return s;
}
void
sandbox_wakeup(sandbox_t *s)
{
#ifndef STANDALONE
softint_disable();
debuglog("[%p: %s]\n", s, s->mod->name);
// perhaps 2 lists in the sandbox to make sure sandbox is either in runlist or waitlist..
assert(s->state == SANDBOX_BLOCKED);
assert(ps_list_singleton_d(s));
s->state = SANDBOX_RUNNABLE;
ps_list_head_append_d(&runq, s);
softint_enable();
#endif
}
@ -120,13 +126,14 @@ sandbox_block(void)
{
#ifndef STANDALONE
// perhaps 2 lists in the sandbox to make sure sandbox is either in runlist or waitlist..
assert(in_callback == 0);
softint_disable();
struct sandbox *c = sandbox_current();
ps_list_rem_d(c);
softint_enable();
debuglog("[%p: %s]\n", c, c->mod->name);
c->state = SANDBOX_BLOCKED;
struct sandbox *s = sandbox_schedule();
debuglog("[%p: %s, %p: %s]\n", c, c->mod->name, s, s ? s->mod->name: "");
softint_enable();
sandbox_switch(s);
#else
uv_run(runtime_uvio(), UV_RUN_DEFAULT);
@ -164,10 +171,10 @@ sandbox_run_func(void *data)
in_callback = 0;
while (1) {
struct sandbox *s = sandbox_schedule();
struct sandbox *s = sandbox_schedule_uvio();
while (s) {
sandbox_switch(s);
s = sandbox_schedule();
s = sandbox_schedule_uvio();
}
}
@ -184,9 +191,7 @@ sandbox_run(struct sandbox *s)
// each sandboxing thread pulls off of that global ready queue..
debuglog("[%p: %s]\n", s, s->mod->name);
s->state = SANDBOX_RUNNABLE;
pthread_mutex_lock(&glbq_mtx);
ps_list_head_append_d(&glbq, s);
pthread_mutex_unlock(&glbq_mtx);
sandbox_deque_push(s);
#else
sandbox_switch(s);
#endif
@ -201,11 +206,16 @@ sandbox_exit(void)
assert(curr);
debuglog("[%p: %s]\n", curr, curr->mod->name);
fprintf(stderr, "(%d,%lu) %s: %p, %s exit\n", sched_getcpu(), pthread_self(), __func__, curr, curr->mod->name);
//printf("%s: disable\n", __func__);
softint_disable();
sandbox_local_stop(curr);
curr->state = SANDBOX_RETURNED;
// TODO: free resources here? or only from main?
sandbox_switch(sandbox_schedule());
struct sandbox *n = sandbox_schedule();
//printf("%s: enable\n", __func__);
softint_enable();
sandbox_switch(n);
#else
sandbox_switch(NULL);
#endif
@ -229,7 +239,9 @@ runtime_uvio_thdfn(void *d)
void
runtime_init(void)
{
ps_list_head_init(&glbq);
glb_dq = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox));
assert(glb_dq);
deque_init_sandbox(glb_dq, SBOX_MAX_REQS);
softint_mask(SIGUSR1);
softint_mask(SIGALRM);

@ -116,4 +116,7 @@ sandbox_free(struct sandbox *sb)
// depending on the memory type
free_linear_memory(sb->linear_start, sb->linear_size, sb->linear_max_size);
free(sb);
// sb is a danging-ptr!
}

@ -56,6 +56,40 @@ softint_timer_disarm(void)
}
}
static inline void
softint_alarm_schedule(void *u)
{
softint_disable(); //no nesting!
struct sandbox *curr = sandbox_current();
ucontext_t *uc = (ucontext_t *)u;
// no sandboxes running..so nothing to preempt..let the "main" scheduler run its course.
if (curr == NULL) goto done;
// find a next sandbox to run..
struct sandbox *next = sandbox_schedule();
if (next == NULL) goto done;
if (next == curr) goto done; // only this sandbox to schedule.. return to it!
// save the current sandbox, state from uc!
arch_mcontext_save(&curr->ctxt, &uc->uc_mcontext);
// sandbox_current_set on it. restore through *uc..
sandbox_current_set(next);
if (arch_mcontext_restore(&uc->uc_mcontext, &next->ctxt)) goto skip;
// reset if SIGALRM happens before SIGUSR1 and if don't preempt..OR
// perhaps switch here for SIGUSR1 and see if we can clear that signal
// so it doesn't get called on SIGALRM return..
// next_context = NULL;
done:
softint_enable();
skip:
return;
}
extern pthread_t rtthd[];
static inline void
softint_handler(int sig, siginfo_t *si, void *u)
{
@ -70,7 +104,16 @@ softint_handler(int sig, siginfo_t *si, void *u)
{
// if interrupts are disabled.. increment a per_thread counter and return
if (si->si_code == SI_KERNEL) {
// TODO: deliver signal to all other runtime threads..
int rt = 0;
// deliver signal to all other runtime threads..
for (int i = 0; i < SBOX_NCORES; i++) {
if (pthread_self() == rtthd[i]) {
rt = 1;
continue;
}
pthread_kill(rtthd[i], SIGALRM);
}
assert(rt == 1);
} else {
assert(si->si_code == SI_TKILL);
}
@ -80,41 +123,20 @@ softint_handler(int sig, siginfo_t *si, void *u)
// softints per-core..
if (next_context) return;
if (!softint_enabled()) return;
// no sandboxes running..so nothing to preempt..let the "main" scheduler run its course.
if (curr == NULL) return;
// find a next sandbox to run..
struct sandbox *next = sandbox_schedule();
if (next == NULL) return;
if (next == curr) return; // only this sandbox to schedule.. return to it!
// save the current sandbox, state from uc!
arch_mcontext_save(&curr->ctxt, &uc->uc_mcontext);
// sandbox_current_set on it. restore through *uc..
sandbox_current_set(next);
arch_mcontext_restore(&uc->uc_mcontext, &next->ctxt);
// reset if SIGALRM happens before SIGUSR1 and if don't preempt..OR
// perhaps switch here for SIGUSR1 and see if we can clear that signal
// so it doesn't get called on SIGALRM return..
// next_context = NULL;
softint_alarm_schedule(u);
break;
}
case SIGUSR1:
{
// make sure sigalrm doesn't mess this up if nested..
assert(!softint_enabled());
/* we set current before calling pthread_kill! */
assert(next_context && (&curr->ctxt == next_context));
assert(si->si_code == SI_TKILL);
//debuglog("usr1:%d\n", usr1_cnt);
usr1_cnt++;
// sigalrm happened.. such a waste of time..!!
if (next_context == NULL) return;
// make sure sigalrm doesn't mess this up if nested..
assert(!softint_enabled());
// do not save current sandbox.. it is in co-operative switch..
// pick the next from "next_context"..
// assert its "sp" to be zero in regs..

@ -6,14 +6,14 @@
"argsize" : 2
},
{
"active" : "no",
"active" : "yes",
"name" : "bitcount",
"path" : "bitcount_wasm.so",
"port" : 10002,
"argsize" : 2
},
{
"active" : "no",
"active" : "yes",
"name" : "basic_math",
"path" : "basic_math_wasm.so",
"port" : 10004,
@ -110,24 +110,31 @@
"port" : 10030,
"argsize" : 1
},
{
"active" : "yes",
"name" : "stringsearch",
"path" : "stringsearch_wasm.so",
"port" : 10032,
"argsize" : 1
},
{
"active" : "no",
"name" : "filesys",
"path" : "filesys_wasm.so",
"port" : 10032,
"port" : 10034,
"argsize" : 3
},
{
"active" : "yes",
"active" : "no",
"name" : "sockserver",
"path" : "sockserver_wasm.so",
"port" : 10034,
"port" : 10036,
"argsize" : 2
},
{
"active" : "yes",
"active" : "no",
"name" : "sockclient",
"path" : "sockclient_wasm.so",
"port" : 10036,
"port" : 10038,
"argsize" : 3
}

@ -1,5 +1,5 @@
;127.0.0.1:10002${ "module" : "bitcount", "args" : [ "bitcount1" , "16777216" ] }
;127.0.0.1:10002${ "module" : "bitcount", "args" : [ "bitcount2" , "16777216" ] }
127.0.0.1:10002${ "module" : "bitcount", "args" : [ "bitcount1" , "16777216" ] }
127.0.0.1:10002${ "module" : "bitcount", "args" : [ "bitcount2" , "16777216" ] }
;127.0.0.1:10004${ "module" : "basic_math", "args" : [ "basic_math1" ] }
;127.0.0.1:10004${ "module" : "basic_math", "args" : [ "basic_math2" ] }
;127.0.0.1:10006${ "module" : "binarytrees", "args" : [ "binarytrees1", "16" ] }
@ -29,6 +29,8 @@
;127.0.0.1:10028${ "module" : "patricia", "args" : [ "patricia1" , "large.udp" ] }
;127.0.0.1:10030${ "module" : "sqlite", "args" : [ "sqlite1" ] }
;127.0.0.1:10030${ "module" : "sqlite", "args" : [ "sqlite2" ] }
;127.0.0.1:10032${ "module" : "filesys", "args" : [ "filesys1", "fs_in.txt", "fs_out.txt" ] }
127.0.0.1:10034${ "module" : "sockserver", "args" : [ "sockserv1", "20000" ] }
127.0.0.1:10036${ "module" : "sockclient", "args" : [ "sockcli1", "localhost", "20000" ] }
127.0.0.1:10032${ "module" : "stringsearch", "args" : [ "strsearch1" ] }
127.0.0.1:10032${ "module" : "stringsearch", "args" : [ "strsearch2" ] }
;127.0.0.1:10034${ "module" : "filesys", "args" : [ "filesys1", "fs_in.txt", "fs_out.txt" ] }
;127.0.0.1:10036${ "module" : "sockserver", "args" : [ "sockserv1", "20000" ] }
;127.0.0.1:10038${ "module" : "sockclient", "args" : [ "sockcli1", "localhost", "20000" ] }

Loading…
Cancel
Save