From c1c307c39040565a00d650aa8f77593fdcf830c4 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Sat, 7 Mar 2020 22:15:00 -0500 Subject: [PATCH] chore: cleanup runtime --- runtime/include/types.h | 2 +- runtime/src/runtime.c | 187 +++++++++++++++++++++++----------------- 2 files changed, 110 insertions(+), 79 deletions(-) diff --git a/runtime/include/types.h b/runtime/include/types.h index 27aa6d3..35d131b 100644 --- a/runtime/include/types.h +++ b/runtime/include/types.h @@ -113,7 +113,7 @@ typedef enum #define MOD_PATH_MAX 256 // Max module path length #define JSON_ELE_MAX 16 // Max number of elements defined in JSON -// FIXME: some naive work-stealing here.. +// This is the max number of standboxes that get pulled onto the local runqueue in a single batch #define SBOX_PULL_MAX 1 #define SBOX_MAX_OPEN 32 diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index 7a59499..e200314 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -11,10 +11,18 @@ #include #include +/*************************** + * Shared Process State * + **************************/ + struct deque_sandbox *global_deque; pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; int epoll_file_descriptor; +/*************************** + * Thread Local State * + **************************/ + __thread static struct ps_list_head local_run_queue; __thread static struct ps_list_head local_completion_queue; @@ -22,6 +30,7 @@ __thread static struct ps_list_head local_completion_queue; __thread sandbox_t *current_sandbox = NULL; // context pointer to switch to when this thread gets a SIGUSR1 +// TODO: Delete this? It doesn't seem to be used. __thread arch_context_t *next_context = NULL; // context of the runtime thread before running sandboxes or to resume its "main". @@ -30,17 +39,20 @@ __thread arch_context_t base_context; // libuv i/o loop handle per sandboxing thread! __thread uv_loop_t uvio; +// Flag to signify if the thread is currently running callbacks in the libuv event loop +static __thread unsigned int in_callback; + /** * Append the sandbox to the local_run_queue - * @param s sandbox to add + * @param sandbox sandbox to add */ static inline void -sandbox_local_run(struct sandbox *s) +sandbox_local_run(struct sandbox *sandbox) { - assert(ps_list_singleton_d(s)); - // fprintf(stderr, "(%d,%lu) %s: run %p, %s\n", sched_getcpu(), pthread_self(), __func__, s, + assert(ps_list_singleton_d(sandbox)); + // fprintf(stderr, "(%d,%lu) %s: run %p, %s\n", sched_getcpu(), pthread_self(), __func__, s, // s->module->name); - ps_list_head_append_d(&local_run_queue, s); + ps_list_head_append_d(&local_run_queue, sandbox); } /** @@ -56,11 +68,13 @@ sandbox_pull(void) while (total_sandboxes_pulled < SBOX_PULL_MAX) { sbox_request_t *sandbox_request; if ((sandbox_request = sandbox_deque_steal()) == NULL) break; + // Actually allocate the sandbox for the requests that we've pulled struct sandbox *sandbox = sandbox_alloc(sandbox_request->module, sandbox_request->args, sandbox_request->sock, sandbox_request->addr, sandbox_request->start_time); assert(sandbox); free(sandbox_request); + // Set the sandbox as runnable and place on the local runqueue sandbox->state = SANDBOX_RUNNABLE; sandbox_local_run(sandbox); total_sandboxes_pulled++; @@ -69,15 +83,12 @@ sandbox_pull(void) return total_sandboxes_pulled; } -static __thread unsigned int in_callback; - /** * Run all outstanding events in the libuv event loop **/ void sandbox_io_nowait(void) { - // non-zero if more callbacks are expected in_callback = 1; int n = uv_run(runtime_uvio(), UV_RUN_NOWAIT), i = 0; while (n > 0) { @@ -85,39 +96,42 @@ sandbox_io_nowait(void) uv_run(runtime_uvio(), UV_RUN_NOWAIT); } in_callback = 0; - // zero, so there is nothing (don't block!) } /** - * @param interrupt seems to be treated as boolean to indicate is_interrupt or not - * @returns A sandbox??? + * Execute the sandbox at the head of the thread local runqueue + * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head + * @param in_interrupt if this is getting called in the context of an interrupt + * @return the sandbox to execute or NULL if none are available **/ struct sandbox * -sandbox_schedule(int interrupt) +sandbox_schedule(int in_interrupt) { + // If the thread local runqueue is empty and we're not running in the context of an interupt, + // pull a fresh batch of sandbox requests from the global queue if (ps_list_head_empty(&local_run_queue)) { // this is in an interrupt context, don't steal work here! - if (interrupt) return NULL; + if (in_interrupt) return NULL; if (sandbox_pull() == 0) { // debuglog("[null: null]\n"); return NULL; } } + // Execute Round Robin Scheduling Logic + // Grab the sandbox at the head of the thread local runqueue, add it to the end, and return it struct sandbox *sandbox = ps_list_head_first_d(&local_run_queue, struct sandbox); - + // We are assuming that any sandboxed in the SANDBOX_RETURNED state should have been pulled from the local runqueue by now! assert(sandbox->state != SANDBOX_RETURNED); - // round-robin ps_list_rem_d(sandbox); ps_list_head_append_d(&local_run_queue, sandbox); debuglog("[%p: %s]\n", sandbox, sandbox->module->name); - return sandbox; } /** - * @brief Remove and free n requests from the completion queue - * @param number_to_free The number of requests to free + * @brief Remove and free n sandboxes from the thread local completion queue + * @param number_to_free The number of sandboxes to free * @return void */ static inline void @@ -125,73 +139,77 @@ sandbox_local_free(unsigned int number_to_free) { for (int i = 0; i < number_to_free; i++) { if (ps_list_head_empty(&local_completion_queue)) break; - struct sandbox *s = ps_list_head_first_d(&local_completion_queue, struct sandbox); - if (!s) break; - ps_list_rem_d(s); - sandbox_free(s); + struct sandbox *sandbox = ps_list_head_first_d(&local_completion_queue, struct sandbox); + if (!sandbox) break; + ps_list_rem_d(sandbox); + sandbox_free(sandbox); } } /** - * ??? - * @return sandbox + * Tries to free a completed request, executes libuv callbacks, and then gets + * and returns the standbox at the head of the thread-local runqueue + * @return sandbox or NULL **/ struct sandbox * sandbox_schedule_io(void) { assert(sandbox_current() == NULL); + // Try to free one sandbox from the completion queue sandbox_local_free(1); + // Execute libuv callbacks if (!in_callback) sandbox_io_nowait(); + // Get and return the sandbox at the head of the thread local runqueue softint_disable(); - struct sandbox *s = sandbox_schedule(0); + struct sandbox *sandbox = sandbox_schedule(0); softint_enable(); - assert(s == NULL || s->state == SANDBOX_RUNNABLE); - - return s; + assert(sandbox == NULL || sandbox->state == SANDBOX_RUNNABLE); + return sandbox; } /** - * ??? - * @param s sandbox + * If this sandbox is blocked, mark it as runnable and add to the head of the thread-local runqueue + * @param sandbox the sandbox to check and update if blocked **/ void -sandbox_wakeup(sandbox_t *s) +sandbox_wakeup(sandbox_t *sandbox) { softint_disable(); - debuglog("[%p: %s]\n", s, s->module->name); - if (s->state != SANDBOX_BLOCKED) goto done; - assert(s->state == SANDBOX_BLOCKED); - assert(ps_list_singleton_d(s)); - s->state = SANDBOX_RUNNABLE; - ps_list_head_append_d(&local_run_queue, s); + debuglog("[%p: %s]\n", sandbox, sandbox->module->name); + if (sandbox->state != SANDBOX_BLOCKED) goto done; + assert(sandbox->state == SANDBOX_BLOCKED); + assert(ps_list_singleton_d(sandbox)); + sandbox->state = SANDBOX_RUNNABLE; + ps_list_head_append_d(&local_run_queue, sandbox); done: softint_enable(); } /** - * ??? + * Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head of the runqueue **/ void sandbox_block(void) { assert(in_callback == 0); softint_disable(); - struct sandbox *c = sandbox_current(); - ps_list_rem_d(c); - c->state = SANDBOX_BLOCKED; - struct sandbox *s = sandbox_schedule(0); - debuglog("[%p: %s, %p: %s]\n", c, c->module->name, s, s ? s->module->name : ""); + struct sandbox *current_sandbox = sandbox_current(); + // TODO: What is this getting removed from again? the thread-local runqueue? + ps_list_rem_d(current_sandbox); + current_sandbox->state = SANDBOX_BLOCKED; + struct sandbox *next_sandbox = sandbox_schedule(0); + debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); softint_enable(); - sandbox_switch(s); + sandbox_switch(next_sandbox); } /** - * ??? + * TODO: What is this doing? **/ void sandbox_block_http(void) @@ -202,9 +220,9 @@ sandbox_block_http(void) // great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do // async block! uv_run(runtime_uvio(), UV_RUN_DEFAULT); -#else +#else /* USE_HTTP_SYNC */ sandbox_block(); -#endif +#endif /* USE_HTTP_UVIO */ #else assert(0); // it should not be called if not using uvio for http @@ -213,7 +231,7 @@ sandbox_block_http(void) /** - * ??? + * TODO: What is this doing? **/ void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void) { @@ -225,13 +243,14 @@ void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt( } /** - * ??? - * @param s sandbox + * Removes the thread from the thread-local runqueue + * TODO: is this correct? + * @param sandbox sandbox **/ static inline void -sandbox_local_stop(struct sandbox *s) +sandbox_local_stop(struct sandbox *sandbox) { - ps_list_rem_d(s); + ps_list_rem_d(sandbox); } /** @@ -246,11 +265,12 @@ sandbox_local_end(struct sandbox *sandbox) } /** - * ??? - * @param data - argument provided by pthread API. We set to -1 on error + * The entry function for sandbox worker threads + * Initializes thread-local state, unmasks signals, sets up libuv loop and + * @param return_code - argument provided by pthread API. We set to -1 on error **/ void * -sandbox_run_func(void *data) +sandbox_run_func(void *return_code) { arch_context_init(&base_context, 0, 0); @@ -266,19 +286,22 @@ sandbox_run_func(void *data) in_callback = 0; while (true) { - struct sandbox *s = sandbox_schedule_io(); - while (s) { - sandbox_switch(s); - s = sandbox_schedule_io(); + struct sandbox *sandbox = sandbox_schedule_io(); + while (sandbox) { + sandbox_switch(sandbox); + sandbox = sandbox_schedule_io(); } } - *(int *)data = -1; - pthread_exit(data); + *(int *)return_code = -1; + pthread_exit(return_code); } /** - * ??? + * Called when the function in the sandbox exits + * Removes the standbox from the thread-local runqueue, sets its state to RETURNED, + * releases the linear memory, and then switches to the sandbox at the head of the runqueue + * TODO: Why are we not adding to the completion queue here? That logic is commented out. **/ void sandbox_exit(void) @@ -286,22 +309,23 @@ sandbox_exit(void) struct sandbox *current_sandbox = sandbox_current(); assert(current_sandbox); softint_disable(); + // Remove from the runqueue sandbox_local_stop(current_sandbox); current_sandbox->state = SANDBOX_RETURNED; // free resources from "main function execution", as stack still in use. - struct sandbox *n = sandbox_schedule(0); - assert(n != current_sandbox); + struct sandbox *next_sandbox = sandbox_schedule(0); + assert(next_sandbox != current_sandbox); softint_enable(); // unmap linear memory only! munmap(current_sandbox->linear_start, SBOX_MAX_MEM + PAGE_SIZE); // sandbox_local_end(current_sandbox); - sandbox_switch(n); + sandbox_switch(next_sandbox); } /** * @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the * sandbox object to the global dequeue - * @param d Unknown + * @param dummy data pointer provided by pthreads API. Unused in this function * @return NULL * * Used Globals: @@ -309,35 +333,42 @@ sandbox_exit(void) * */ void * -runtime_accept_thdfn(void *d) +runtime_accept_thdfn(void *dummy) { struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); int total_requests = 0; + while (true) { - int ready = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1); + int request_count = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1); u64 start_time = rdtsc(); - for (int i = 0; i < ready; i++) { + for (int i = 0; i < request_count; i++) { if (epoll_events[i].events & EPOLLERR) { perror("epoll_wait"); assert(0); } struct sockaddr_in client; - socklen_t client_len = sizeof(client); - struct module * m = (struct module *)epoll_events[i].data.ptr; - assert(m); - int es = m->socket_descriptor; - int s = accept(es, (struct sockaddr *)&client, &client_len); - if (s < 0) { + socklen_t client_length = sizeof(client); + struct module * module = (struct module *)epoll_events[i].data.ptr; + assert(module); + int es = module->socket_descriptor; + int socket_descriptor = accept(es, (struct sockaddr *)&client, &client_length); + if (socket_descriptor < 0) { perror("accept"); assert(0); } total_requests++; printf("Received Request %d at %lu\n", total_requests, start_time); - sbox_request_t *sb = sbox_request_alloc(m, m->name, s, (const struct sockaddr *)&client, - start_time); - assert(sb); + sbox_request_t *sandbox_request = sbox_request_alloc( + module, + module->name, + socket_descriptor, + (const struct sockaddr *)&client, + start_time); + assert(sandbox_request); + + // TODO: Refactor sbox_request_alloc to not add to global request queue and do this here } }