chore: cleanup runtime

sledge_graph
Sean McBride 5 years ago
parent 267a04350f
commit c1c307c390

@ -113,7 +113,7 @@ typedef enum
#define MOD_PATH_MAX 256 // Max module path length #define MOD_PATH_MAX 256 // Max module path length
#define JSON_ELE_MAX 16 // Max number of elements defined in JSON #define JSON_ELE_MAX 16 // Max number of elements defined in JSON
// FIXME: some naive work-stealing here.. // This is the max number of standboxes that get pulled onto the local runqueue in a single batch
#define SBOX_PULL_MAX 1 #define SBOX_PULL_MAX 1
#define SBOX_MAX_OPEN 32 #define SBOX_MAX_OPEN 32

@ -11,10 +11,18 @@
#include <uv.h> #include <uv.h>
#include <http_api.h> #include <http_api.h>
/***************************
* Shared Process State *
**************************/
struct deque_sandbox *global_deque; struct deque_sandbox *global_deque;
pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER;
int epoll_file_descriptor; int epoll_file_descriptor;
/***************************
* Thread Local State *
**************************/
__thread static struct ps_list_head local_run_queue; __thread static struct ps_list_head local_run_queue;
__thread static struct ps_list_head local_completion_queue; __thread static struct ps_list_head local_completion_queue;
@ -22,6 +30,7 @@ __thread static struct ps_list_head local_completion_queue;
__thread sandbox_t *current_sandbox = NULL; __thread sandbox_t *current_sandbox = NULL;
// context pointer to switch to when this thread gets a SIGUSR1 // context pointer to switch to when this thread gets a SIGUSR1
// TODO: Delete this? It doesn't seem to be used.
__thread arch_context_t *next_context = NULL; __thread arch_context_t *next_context = NULL;
// context of the runtime thread before running sandboxes or to resume its "main". // context of the runtime thread before running sandboxes or to resume its "main".
@ -30,17 +39,20 @@ __thread arch_context_t base_context;
// libuv i/o loop handle per sandboxing thread! // libuv i/o loop handle per sandboxing thread!
__thread uv_loop_t uvio; __thread uv_loop_t uvio;
// Flag to signify if the thread is currently running callbacks in the libuv event loop
static __thread unsigned int in_callback;
/** /**
* Append the sandbox to the local_run_queue * Append the sandbox to the local_run_queue
* @param s sandbox to add * @param sandbox sandbox to add
*/ */
static inline void static inline void
sandbox_local_run(struct sandbox *s) sandbox_local_run(struct sandbox *sandbox)
{ {
assert(ps_list_singleton_d(s)); assert(ps_list_singleton_d(sandbox));
// fprintf(stderr, "(%d,%lu) %s: run %p, %s\n", sched_getcpu(), pthread_self(), __func__, s, // fprintf(stderr, "(%d,%lu) %s: run %p, %s\n", sched_getcpu(), pthread_self(), __func__, s,
// s->module->name); // s->module->name);
ps_list_head_append_d(&local_run_queue, s); ps_list_head_append_d(&local_run_queue, sandbox);
} }
/** /**
@ -56,11 +68,13 @@ sandbox_pull(void)
while (total_sandboxes_pulled < SBOX_PULL_MAX) { while (total_sandboxes_pulled < SBOX_PULL_MAX) {
sbox_request_t *sandbox_request; sbox_request_t *sandbox_request;
if ((sandbox_request = sandbox_deque_steal()) == NULL) break; if ((sandbox_request = sandbox_deque_steal()) == NULL) break;
// Actually allocate the sandbox for the requests that we've pulled
struct sandbox *sandbox = sandbox_alloc(sandbox_request->module, sandbox_request->args, struct sandbox *sandbox = sandbox_alloc(sandbox_request->module, sandbox_request->args,
sandbox_request->sock, sandbox_request->addr, sandbox_request->sock, sandbox_request->addr,
sandbox_request->start_time); sandbox_request->start_time);
assert(sandbox); assert(sandbox);
free(sandbox_request); free(sandbox_request);
// Set the sandbox as runnable and place on the local runqueue
sandbox->state = SANDBOX_RUNNABLE; sandbox->state = SANDBOX_RUNNABLE;
sandbox_local_run(sandbox); sandbox_local_run(sandbox);
total_sandboxes_pulled++; total_sandboxes_pulled++;
@ -69,15 +83,12 @@ sandbox_pull(void)
return total_sandboxes_pulled; return total_sandboxes_pulled;
} }
static __thread unsigned int in_callback;
/** /**
* Run all outstanding events in the libuv event loop * Run all outstanding events in the libuv event loop
**/ **/
void void
sandbox_io_nowait(void) sandbox_io_nowait(void)
{ {
// non-zero if more callbacks are expected
in_callback = 1; in_callback = 1;
int n = uv_run(runtime_uvio(), UV_RUN_NOWAIT), i = 0; int n = uv_run(runtime_uvio(), UV_RUN_NOWAIT), i = 0;
while (n > 0) { while (n > 0) {
@ -85,39 +96,42 @@ sandbox_io_nowait(void)
uv_run(runtime_uvio(), UV_RUN_NOWAIT); uv_run(runtime_uvio(), UV_RUN_NOWAIT);
} }
in_callback = 0; in_callback = 0;
// zero, so there is nothing (don't block!)
} }
/** /**
* @param interrupt seems to be treated as boolean to indicate is_interrupt or not * Execute the sandbox at the head of the thread local runqueue
* @returns A sandbox??? * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @param in_interrupt if this is getting called in the context of an interrupt
* @return the sandbox to execute or NULL if none are available
**/ **/
struct sandbox * struct sandbox *
sandbox_schedule(int interrupt) sandbox_schedule(int in_interrupt)
{ {
// If the thread local runqueue is empty and we're not running in the context of an interupt,
// pull a fresh batch of sandbox requests from the global queue
if (ps_list_head_empty(&local_run_queue)) { if (ps_list_head_empty(&local_run_queue)) {
// this is in an interrupt context, don't steal work here! // this is in an interrupt context, don't steal work here!
if (interrupt) return NULL; if (in_interrupt) return NULL;
if (sandbox_pull() == 0) { if (sandbox_pull() == 0) {
// debuglog("[null: null]\n"); // debuglog("[null: null]\n");
return NULL; return NULL;
} }
} }
// Execute Round Robin Scheduling Logic
// Grab the sandbox at the head of the thread local runqueue, add it to the end, and return it
struct sandbox *sandbox = ps_list_head_first_d(&local_run_queue, struct sandbox); struct sandbox *sandbox = ps_list_head_first_d(&local_run_queue, struct sandbox);
// We are assuming that any sandboxed in the SANDBOX_RETURNED state should have been pulled from the local runqueue by now!
assert(sandbox->state != SANDBOX_RETURNED); assert(sandbox->state != SANDBOX_RETURNED);
// round-robin
ps_list_rem_d(sandbox); ps_list_rem_d(sandbox);
ps_list_head_append_d(&local_run_queue, sandbox); ps_list_head_append_d(&local_run_queue, sandbox);
debuglog("[%p: %s]\n", sandbox, sandbox->module->name); debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
return sandbox; return sandbox;
} }
/** /**
* @brief Remove and free n requests from the completion queue * @brief Remove and free n sandboxes from the thread local completion queue
* @param number_to_free The number of requests to free * @param number_to_free The number of sandboxes to free
* @return void * @return void
*/ */
static inline void static inline void
@ -125,73 +139,77 @@ sandbox_local_free(unsigned int number_to_free)
{ {
for (int i = 0; i < number_to_free; i++) { for (int i = 0; i < number_to_free; i++) {
if (ps_list_head_empty(&local_completion_queue)) break; if (ps_list_head_empty(&local_completion_queue)) break;
struct sandbox *s = ps_list_head_first_d(&local_completion_queue, struct sandbox); struct sandbox *sandbox = ps_list_head_first_d(&local_completion_queue, struct sandbox);
if (!s) break; if (!sandbox) break;
ps_list_rem_d(s); ps_list_rem_d(sandbox);
sandbox_free(s); sandbox_free(sandbox);
} }
} }
/** /**
* ??? * Tries to free a completed request, executes libuv callbacks, and then gets
* @return sandbox * and returns the standbox at the head of the thread-local runqueue
* @return sandbox or NULL
**/ **/
struct sandbox * struct sandbox *
sandbox_schedule_io(void) sandbox_schedule_io(void)
{ {
assert(sandbox_current() == NULL); assert(sandbox_current() == NULL);
// Try to free one sandbox from the completion queue
sandbox_local_free(1); sandbox_local_free(1);
// Execute libuv callbacks
if (!in_callback) sandbox_io_nowait(); if (!in_callback) sandbox_io_nowait();
// Get and return the sandbox at the head of the thread local runqueue
softint_disable(); softint_disable();
struct sandbox *s = sandbox_schedule(0); struct sandbox *sandbox = sandbox_schedule(0);
softint_enable(); softint_enable();
assert(s == NULL || s->state == SANDBOX_RUNNABLE); assert(sandbox == NULL || sandbox->state == SANDBOX_RUNNABLE);
return sandbox;
return s;
} }
/** /**
* ??? * If this sandbox is blocked, mark it as runnable and add to the head of the thread-local runqueue
* @param s sandbox * @param sandbox the sandbox to check and update if blocked
**/ **/
void void
sandbox_wakeup(sandbox_t *s) sandbox_wakeup(sandbox_t *sandbox)
{ {
softint_disable(); softint_disable();
debuglog("[%p: %s]\n", s, s->module->name); debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
if (s->state != SANDBOX_BLOCKED) goto done; if (sandbox->state != SANDBOX_BLOCKED) goto done;
assert(s->state == SANDBOX_BLOCKED); assert(sandbox->state == SANDBOX_BLOCKED);
assert(ps_list_singleton_d(s)); assert(ps_list_singleton_d(sandbox));
s->state = SANDBOX_RUNNABLE; sandbox->state = SANDBOX_RUNNABLE;
ps_list_head_append_d(&local_run_queue, s); ps_list_head_append_d(&local_run_queue, sandbox);
done: done:
softint_enable(); softint_enable();
} }
/** /**
* ??? * Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head of the runqueue
**/ **/
void void
sandbox_block(void) sandbox_block(void)
{ {
assert(in_callback == 0); assert(in_callback == 0);
softint_disable(); softint_disable();
struct sandbox *c = sandbox_current(); struct sandbox *current_sandbox = sandbox_current();
ps_list_rem_d(c); // TODO: What is this getting removed from again? the thread-local runqueue?
c->state = SANDBOX_BLOCKED; ps_list_rem_d(current_sandbox);
struct sandbox *s = sandbox_schedule(0); current_sandbox->state = SANDBOX_BLOCKED;
debuglog("[%p: %s, %p: %s]\n", c, c->module->name, s, s ? s->module->name : ""); struct sandbox *next_sandbox = sandbox_schedule(0);
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : "");
softint_enable(); softint_enable();
sandbox_switch(s); sandbox_switch(next_sandbox);
} }
/** /**
* ??? * TODO: What is this doing?
**/ **/
void void
sandbox_block_http(void) sandbox_block_http(void)
@ -202,9 +220,9 @@ sandbox_block_http(void)
// great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do // great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do
// async block! // async block!
uv_run(runtime_uvio(), UV_RUN_DEFAULT); uv_run(runtime_uvio(), UV_RUN_DEFAULT);
#else #else /* USE_HTTP_SYNC */
sandbox_block(); sandbox_block();
#endif #endif /* USE_HTTP_UVIO */
#else #else
assert(0); assert(0);
// it should not be called if not using uvio for http // it should not be called if not using uvio for http
@ -213,7 +231,7 @@ sandbox_block_http(void)
/** /**
* ??? * TODO: What is this doing?
**/ **/
void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void) void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void)
{ {
@ -225,13 +243,14 @@ void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(
} }
/** /**
* ??? * Removes the thread from the thread-local runqueue
* @param s sandbox * TODO: is this correct?
* @param sandbox sandbox
**/ **/
static inline void static inline void
sandbox_local_stop(struct sandbox *s) sandbox_local_stop(struct sandbox *sandbox)
{ {
ps_list_rem_d(s); ps_list_rem_d(sandbox);
} }
/** /**
@ -246,11 +265,12 @@ sandbox_local_end(struct sandbox *sandbox)
} }
/** /**
* ??? * The entry function for sandbox worker threads
* @param data - argument provided by pthread API. We set to -1 on error * Initializes thread-local state, unmasks signals, sets up libuv loop and
* @param return_code - argument provided by pthread API. We set to -1 on error
**/ **/
void * void *
sandbox_run_func(void *data) sandbox_run_func(void *return_code)
{ {
arch_context_init(&base_context, 0, 0); arch_context_init(&base_context, 0, 0);
@ -266,19 +286,22 @@ sandbox_run_func(void *data)
in_callback = 0; in_callback = 0;
while (true) { while (true) {
struct sandbox *s = sandbox_schedule_io(); struct sandbox *sandbox = sandbox_schedule_io();
while (s) { while (sandbox) {
sandbox_switch(s); sandbox_switch(sandbox);
s = sandbox_schedule_io(); sandbox = sandbox_schedule_io();
} }
} }
*(int *)data = -1; *(int *)return_code = -1;
pthread_exit(data); pthread_exit(return_code);
} }
/** /**
* ??? * Called when the function in the sandbox exits
* Removes the standbox from the thread-local runqueue, sets its state to RETURNED,
* releases the linear memory, and then switches to the sandbox at the head of the runqueue
* TODO: Why are we not adding to the completion queue here? That logic is commented out.
**/ **/
void void
sandbox_exit(void) sandbox_exit(void)
@ -286,22 +309,23 @@ sandbox_exit(void)
struct sandbox *current_sandbox = sandbox_current(); struct sandbox *current_sandbox = sandbox_current();
assert(current_sandbox); assert(current_sandbox);
softint_disable(); softint_disable();
// Remove from the runqueue
sandbox_local_stop(current_sandbox); sandbox_local_stop(current_sandbox);
current_sandbox->state = SANDBOX_RETURNED; current_sandbox->state = SANDBOX_RETURNED;
// free resources from "main function execution", as stack still in use. // free resources from "main function execution", as stack still in use.
struct sandbox *n = sandbox_schedule(0); struct sandbox *next_sandbox = sandbox_schedule(0);
assert(n != current_sandbox); assert(next_sandbox != current_sandbox);
softint_enable(); softint_enable();
// unmap linear memory only! // unmap linear memory only!
munmap(current_sandbox->linear_start, SBOX_MAX_MEM + PAGE_SIZE); munmap(current_sandbox->linear_start, SBOX_MAX_MEM + PAGE_SIZE);
// sandbox_local_end(current_sandbox); // sandbox_local_end(current_sandbox);
sandbox_switch(n); sandbox_switch(next_sandbox);
} }
/** /**
* @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the * @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the
* sandbox object to the global dequeue * sandbox object to the global dequeue
* @param d Unknown * @param dummy data pointer provided by pthreads API. Unused in this function
* @return NULL * @return NULL
* *
* Used Globals: * Used Globals:
@ -309,35 +333,42 @@ sandbox_exit(void)
* *
*/ */
void * void *
runtime_accept_thdfn(void *d) runtime_accept_thdfn(void *dummy)
{ {
struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event));
int total_requests = 0; int total_requests = 0;
while (true) { while (true) {
int ready = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1); int request_count = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1);
u64 start_time = rdtsc(); u64 start_time = rdtsc();
for (int i = 0; i < ready; i++) { for (int i = 0; i < request_count; i++) {
if (epoll_events[i].events & EPOLLERR) { if (epoll_events[i].events & EPOLLERR) {
perror("epoll_wait"); perror("epoll_wait");
assert(0); assert(0);
} }
struct sockaddr_in client; struct sockaddr_in client;
socklen_t client_len = sizeof(client); socklen_t client_length = sizeof(client);
struct module * m = (struct module *)epoll_events[i].data.ptr; struct module * module = (struct module *)epoll_events[i].data.ptr;
assert(m); assert(module);
int es = m->socket_descriptor; int es = module->socket_descriptor;
int s = accept(es, (struct sockaddr *)&client, &client_len); int socket_descriptor = accept(es, (struct sockaddr *)&client, &client_length);
if (s < 0) { if (socket_descriptor < 0) {
perror("accept"); perror("accept");
assert(0); assert(0);
} }
total_requests++; total_requests++;
printf("Received Request %d at %lu\n", total_requests, start_time); printf("Received Request %d at %lu\n", total_requests, start_time);
sbox_request_t *sb = sbox_request_alloc(m, m->name, s, (const struct sockaddr *)&client, sbox_request_t *sandbox_request = sbox_request_alloc(
module,
module->name,
socket_descriptor,
(const struct sockaddr *)&client,
start_time); start_time);
assert(sb); assert(sandbox_request);
// TODO: Refactor sbox_request_alloc to not add to global request queue and do this here
} }
} }

Loading…
Cancel
Save