From f5f9c168c6beef04c68205f19ac959959d629f8b Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Tue, 21 Apr 2020 16:42:42 -0400 Subject: [PATCH] chore: Simplify worker scheduling --- runtime/include/sandbox.h | 2 +- runtime/src/runtime.c | 133 +++++++++++++++---------------- runtime/src/software_interrupt.c | 48 +++++------ 3 files changed, 88 insertions(+), 95 deletions(-) diff --git a/runtime/include/sandbox.h b/runtime/include/sandbox.h index 9de6cb8..1b99a38 100644 --- a/runtime/include/sandbox.h +++ b/runtime/include/sandbox.h @@ -88,7 +88,7 @@ extern __thread arch_context_t *worker_thread_next_context; extern void worker_thread_block_current_sandbox(void); extern void worker_thread_exit_current_sandbox(void); -extern struct sandbox *worker_thread_get_next_sandbox(bool is_in_interrupt); +extern struct sandbox *worker_thread_get_next_sandbox(void); extern void worker_thread_process_io(void); extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void); extern void worker_thread_wakeup_sandbox(sandbox_t *sandbox); diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index 1ff3f3e..375fea2 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -43,6 +43,7 @@ int runtime_epoll_file_descriptor; void runtime_initialize(void) { + // Setup epoll runtime_epoll_file_descriptor = epoll_create1(0); assert(runtime_epoll_file_descriptor >= 0); @@ -82,13 +83,16 @@ listener_thread_main(void *dummy) while (true) { int request_count = epoll_wait(runtime_epoll_file_descriptor, epoll_events, LISTENER_THREAD_MAX_EPOLL_EVENTS, -1); - u64 start_time = __getcycles(); + + // Capture Start Time to calculate absolute deadline + u64 start_time = __getcycles(); for (int i = 0; i < request_count; i++) { if (epoll_events[i].events & EPOLLERR) { perror("epoll_wait"); assert(false); } + // Accept Client Request struct sockaddr_in client_address; socklen_t client_length = sizeof(client_address); struct module * module = (struct module *)epoll_events[i].data.ptr; @@ -101,16 +105,18 @@ listener_thread_main(void *dummy) } total_requests++; + // Allocate a Sandbox Request sandbox_request_t *sandbox_request = sandbox_request_allocate(module, module->name, socket_descriptor, (const struct sockaddr *)&client_address, start_time); assert(sandbox_request); + + // Add to the Global Sandbox Request Scheduler sandbox_request_scheduler_add(sandbox_request); } } free(epoll_events); - return NULL; } @@ -167,14 +173,24 @@ static inline void worker_thread_switch_to_sandbox(struct sandbox *next_sandbox) { arch_context_t *next_register_context = next_sandbox == NULL ? NULL : &next_sandbox->ctxt; + software_interrupt_disable(); - struct sandbox *current_sandbox = current_sandbox_get(); - arch_context_t *current_register_context = current_sandbox == NULL ? NULL : ¤t_sandbox->ctxt; + + // Get the old sandbox we're switching from + struct sandbox *previous_sandbox = current_sandbox_get(); + arch_context_t *previous_register_context = previous_sandbox == NULL ? NULL : &previous_sandbox->ctxt; + + // Set the current sandbox to the next current_sandbox_set(next_sandbox); - // If the current sandbox we're switching from is in a RETURNED state, add to completion queue - if (current_sandbox && current_sandbox->state == RETURNED) sandbox_completion_queue_add(current_sandbox); + + // and switch to the associated context. But what is the purpose of worker_thread_next_context? worker_thread_next_context = next_register_context; - arch_context_switch(current_register_context, next_register_context); + arch_context_switch(previous_register_context, next_register_context); + + // If the current sandbox we're switching from is in a RETURNED state, add to completion queue + if (previous_sandbox != NULL && previous_sandbox->state == RETURNED) + sandbox_completion_queue_add(previous_sandbox); + software_interrupt_enable(); } @@ -187,10 +203,10 @@ worker_thread_wakeup_sandbox(sandbox_t *sandbox) { software_interrupt_disable(); // debuglog("[%p: %s]\n", sandbox, sandbox->module->name); - if (sandbox->state != BLOCKED) goto done; - sandbox->state = RUNNABLE; - sandbox_run_queue_append(sandbox); -done: + if (sandbox->state == BLOCKED) { + sandbox->state = RUNNABLE; + sandbox_run_queue_append(sandbox); + } software_interrupt_enable(); } @@ -205,13 +221,14 @@ worker_thread_block_current_sandbox(void) assert(worker_thread_is_in_callback == false); software_interrupt_disable(); - struct sandbox *current_sandbox = current_sandbox_get(); - sandbox_run_queue_remove(current_sandbox); - current_sandbox->state = BLOCKED; - - struct sandbox *next_sandbox = worker_thread_get_next_sandbox(false); + // Remove the sandbox we were just executing from the runqueue and mark as blocked + struct sandbox *previous_sandbox = current_sandbox_get(); + sandbox_run_queue_remove(previous_sandbox); + previous_sandbox->state = BLOCKED; - debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, + // Switch to the next sandbox + struct sandbox *next_sandbox = worker_thread_get_next_sandbox(); + debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", previous_sandbox, previous_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); software_interrupt_enable(); worker_thread_switch_to_sandbox(next_sandbox); @@ -295,47 +312,26 @@ worker_thread_execute_libuv_event_loop(void) /** * Execute the sandbox at the head of the thread local runqueue * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head - * @param in_interrupt if this is getting called in the context of an interrupt * @return the sandbox to execute or NULL if none are available **/ struct sandbox * -worker_thread_get_next_sandbox(bool is_in_interrupt) +worker_thread_get_next_sandbox() { - // If the thread local runqueue is empty and we're not running in the context of an interupt, - // pull a fresh batch of sandbox requests from the global queue if (sandbox_run_queue_is_empty()) { - if (is_in_interrupt) return NULL; - if (worker_thread_pull_and_process_sandbox_requests() == 0) { - // debuglog("[null: null]\n"); - return NULL; - } + int sandboxes_pulled = worker_thread_pull_and_process_sandbox_requests(); + if (sandboxes_pulled == 0) return NULL; } // Execute Round Robin Scheduling Logic - // Grab the sandbox at the head of the thread local runqueue, add it to the end, and return it - struct sandbox *sandbox = sandbox_run_queue_get_head(); - // We are assuming that any sandboxed in the RETURNED state should have been pulled from the local runqueue by - // now! - assert(sandbox->state != RETURNED); - ps_list_rem_d(sandbox); - sandbox_run_queue_append(sandbox); - debuglog("[%p: %s]\n", sandbox, sandbox->module->name); - return sandbox; -} + struct sandbox *next_sandbox = sandbox_run_queue_get_head(); + assert(next_sandbox->state != RETURNED); + sandbox_run_queue_remove(next_sandbox); + sandbox_run_queue_append(next_sandbox); -/** - * Tries to free a completed request, executes libuv callbacks - * @return sandbox or NULL - **/ -static inline void -worker_thread_execute_runtime_maintenance(void) -{ - assert(current_sandbox_get() == NULL); - sandbox_completion_queue_free(1); - if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop(); + debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name); + return next_sandbox; } - /** * The entry function for sandbox worker threads * Initializes thread-local state, unmasks signals, sets up libuv loop and @@ -344,8 +340,8 @@ worker_thread_execute_runtime_maintenance(void) void * worker_thread_main(void *return_code) { + // Initialize Worker State arch_context_init(&worker_thread_base_context, 0, 0); - sandbox_run_queue_initialize(); sandbox_completion_queue_initialize(); software_interrupt_is_disabled = false; @@ -357,19 +353,21 @@ worker_thread_main(void *return_code) uv_loop_init(&worker_thread_uvio_handle); worker_thread_is_in_callback = false; + + // Begin Worker Execution Loop + struct sandbox *next_sandbox; while (true) { - worker_thread_execute_runtime_maintenance(); + assert(current_sandbox_get() == NULL); + // If "in a callback", the libuv event loop is triggering this, so we don't need to start it + if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop(); + software_interrupt_disable(); - struct sandbox *sandbox = worker_thread_get_next_sandbox(false); + next_sandbox = worker_thread_get_next_sandbox(); software_interrupt_enable(); - assert(sandbox == NULL || sandbox->state == RUNNABLE); - while (sandbox) { - worker_thread_switch_to_sandbox(sandbox); - worker_thread_execute_runtime_maintenance(); - software_interrupt_disable(); - sandbox = worker_thread_get_next_sandbox(false); - software_interrupt_enable(); - assert(sandbox == NULL || sandbox->state == RUNNABLE); + + if (next_sandbox != NULL) { + worker_thread_switch_to_sandbox(next_sandbox); + sandbox_completion_queue_free(1); } } @@ -386,17 +384,18 @@ worker_thread_main(void *return_code) void worker_thread_exit_current_sandbox(void) { - struct sandbox *current_sandbox = current_sandbox_get(); - assert(current_sandbox); + // Remove the sandbox that exited from the runqueue and set state to RETURNED + struct sandbox *previous_sandbox = current_sandbox_get(); + assert(previous_sandbox); software_interrupt_disable(); - sandbox_run_queue_remove(current_sandbox); - current_sandbox->state = RETURNED; + sandbox_run_queue_remove(previous_sandbox); + previous_sandbox->state = RETURNED; - struct sandbox *next_sandbox = worker_thread_get_next_sandbox(true); - assert(next_sandbox != current_sandbox); + struct sandbox *next_sandbox = worker_thread_get_next_sandbox(); + assert(next_sandbox != previous_sandbox); software_interrupt_enable(); - // free resources from "main function execution", as stack still in use. - // unmap linear memory only! - munmap(current_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE); + // Because the stack is still in use, only unmap linear memory and defer free resources until "main function + // execution" + munmap(previous_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE); worker_thread_switch_to_sandbox(next_sandbox); } diff --git a/runtime/src/software_interrupt.c b/runtime/src/software_interrupt.c index fcaa05c..2582aa5 100644 --- a/runtime/src/software_interrupt.c +++ b/runtime/src/software_interrupt.c @@ -55,24 +55,22 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void #ifdef PREEMPT_DISABLE assert(0); #else - struct sandbox *current_sandbox = current_sandbox_get(); ucontext_t * user_context = (ucontext_t *)user_context_raw; + struct sandbox *current_sandbox = current_sandbox_get(); switch (signal_type) { case SIGALRM: { - // if interrupts are disabled.. increment a per_thread counter and return + // SIGALRM is the preemption signal that occurs every quantum of execution + + if (signal_info->si_code == SI_KERNEL) { - int rt = 0; - // deliver signal to all other runtime threads.. + // deliver signal to all other worker threads.. for (int i = 0; i < WORKER_THREAD_CORE_COUNT; i++) { - if (pthread_self() == runtime_worker_threads[i]) { - rt = 1; - continue; - } + if (pthread_self() == runtime_worker_threads[i]) continue; pthread_kill(runtime_worker_threads[i], SIGALRM); } - assert(rt == 1); } else { + // What is this? assert(signal_info->si_code == SI_TKILL); } // debuglog("alrm:%d\n", software_interrupt_SIGALRM_count); @@ -82,7 +80,7 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void if (current_sandbox && current_sandbox->state == RETURNED) return; if (worker_thread_next_context) return; if (!software_interrupt_is_enabled()) return; - software_interrupt_schedule_alarm(user_context_raw); + software_interrupt_schedule_alarm(user_context); break; } @@ -120,28 +118,24 @@ software_interrupt_schedule_alarm(void *user_context_raw) software_interrupt_disable(); // no nesting! struct sandbox *current_sandbox = current_sandbox_get(); - ucontext_t * user_context = (ucontext_t *)user_context_raw; - // no sandboxes running..so nothing to preempt..let the "main" scheduler run its course. - if (current_sandbox == NULL) goto done; + // If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. + if (current_sandbox != NULL) { + // find a next sandbox to run.. + struct sandbox *next_sandbox = worker_thread_get_next_sandbox(); - // find a next sandbox to run.. - struct sandbox *next_sandbox = worker_thread_get_next_sandbox(true); - if (next_sandbox == NULL) goto done; - if (next_sandbox == current_sandbox) goto done; // only this sandbox to schedule.. return to it! - // save the current sandbox, state from user_context! - arch_mcontext_save(¤t_sandbox->ctxt, &user_context->uc_mcontext); + if (next_sandbox != NULL && next_sandbox != current_sandbox) { + ucontext_t *user_context = (ucontext_t *)user_context_raw; - // current_sandbox_set on it. restore through *user_context.. - current_sandbox_set(next_sandbox); + // Save context to the sandbox we're switching from + arch_mcontext_save(¤t_sandbox->ctxt, &user_context->uc_mcontext); - if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt)) goto skip; - // reset if SIGALRM happens before SIGUSR1 and if don't preempt..OR - // perhaps switch here for SIGUSR1 and see if we can clear that signal - // so it doesn't get called on SIGALRM return.. - // worker_thread_next_context = NULL; + // current_sandbox_set on it. restore through *user_context.. + current_sandbox_set(next_sandbox); + if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt)) goto skip; + } + } -done: software_interrupt_enable(); skip: return;