From 0f0d0fcb18ceae87b214660687ebdb26d4eac05f Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Thu, 13 May 2021 15:12:22 -0400 Subject: [PATCH] feat: rework of scheduler logic --- .vscode/settings.json | 8 +- runtime/include/arch/context.h | 3 - runtime/include/arch/x86_64/context.h | 1 - runtime/include/current_sandbox_block.h | 42 ---- runtime/include/current_sandbox_yield.h | 45 ---- runtime/include/local_runqueue.h | 3 - runtime/include/sandbox_receive_request.h | 4 +- runtime/include/sandbox_send_response.h | 4 +- runtime/include/scheduler.h | 198 ++++++++++++++++++ .../worker_thread_execute_epoll_loop.h | 82 ++++++++ runtime/src/current_sandbox.c | 4 +- runtime/src/libc/syscall.c | 6 +- runtime/src/local_runqueue.c | 11 - runtime/src/local_runqueue_list.c | 14 +- runtime/src/local_runqueue_minheap.c | 121 +---------- runtime/src/sandbox.c | 59 ------ runtime/src/software_interrupt.c | 14 +- runtime/src/worker_thread.c | 79 +------ 18 files changed, 314 insertions(+), 384 deletions(-) delete mode 100644 runtime/include/current_sandbox_block.h delete mode 100644 runtime/include/current_sandbox_yield.h create mode 100644 runtime/include/scheduler.h create mode 100644 runtime/include/worker_thread_execute_epoll_loop.h diff --git a/.vscode/settings.json b/.vscode/settings.json index 9d928bf..dbb5187 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -76,14 +76,16 @@ "local_runqueue.h": "c", "software_interrupt.h": "c", "sandbox_set_as_runnable.h": "c", - "current_sandbox_yield.h": "c", "sandbox_set_as_complete.h": "c", "deque.h": "c", "sandbox_request.h": "c", "sandbox_send_response.h": "c", "sandbox_setup_arguments.h": "c", "worker_thread.h": "c", - "sandbox_set_as_returned.h": "c" + "sandbox_set_as_error.h": "c", + "likely.h": "c", + "debuglog.h": "c", + "worker_thread_execute_epoll_loop.h": "c" }, "files.exclude": { "**/.git": true, @@ -138,4 +140,4 @@ "*.bc": true, "*.wasm": true, } -} +} \ No newline at end of file diff --git a/runtime/include/arch/context.h b/runtime/include/arch/context.h index 72443aa..7759f74 100644 --- a/runtime/include/arch/context.h +++ b/runtime/include/arch/context.h @@ -56,9 +56,6 @@ arch_mcontext_restore(mcontext_t *active_context, struct arch_context *sandbox_c /* Restore mcontext */ memcpy(active_context, &sandbox_context->mctx, sizeof(mcontext_t)); - - /* Reenable software interrupts if we restored a preemptable sandbox */ - if (sandbox_context->preemptable) software_interrupt_enable(); } diff --git a/runtime/include/arch/x86_64/context.h b/runtime/include/arch/x86_64/context.h index 5f90818..2a31142 100644 --- a/runtime/include/arch/x86_64/context.h +++ b/runtime/include/arch/x86_64/context.h @@ -52,7 +52,6 @@ arch_context_init(struct arch_context *actx, reg_t ip, reg_t sp) /** * Load a new sandbox that preempted an existing sandbox, restoring only the * instruction pointer and stack pointer registers. - * I am unclear about setting the BP. Issue #131 * @param active_context - the context of the current worker thread * @param sandbox_context - the context that we want to restore */ diff --git a/runtime/include/current_sandbox_block.h b/runtime/include/current_sandbox_block.h deleted file mode 100644 index 6904088..0000000 --- a/runtime/include/current_sandbox_block.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include -#include - -#include "current_sandbox.h" -#include "current_sandbox_yield.h" -#include "generic_thread.h" -#include "local_runqueue.h" -#include "sandbox_set_as_blocked.h" -#include "software_interrupt.h" - -/** - * Mark the currently executing sandbox as blocked, remove it from the local runqueue, - * and switch to base context - */ -static inline void -current_sandbox_block(void) -{ - /* Remove the sandbox we were just executing from the runqueue and mark as blocked */ - struct sandbox *current_sandbox = current_sandbox_get(); - - /* We might either have blocked in start reading the request or while executing within the WebAssembly - * entrypoint. The preemptable flag on the context is used to differentiate. In either case, we should - * have disabled interrupts. - */ - if (current_sandbox->ctxt.preemptable) software_interrupt_disable(); - assert(!software_interrupt_is_enabled()); - - assert(current_sandbox->state == SANDBOX_RUNNING); - sandbox_set_as_blocked(current_sandbox, SANDBOX_RUNNING); - generic_thread_dump_lock_overhead(); - - /* The worker thread seems to "spin" on a blocked sandbox, so try to execute another sandbox for one quantum - * after blocking to give time for the action to resolve */ - struct sandbox *next_sandbox = local_runqueue_get_next(); - if (next_sandbox != NULL) { - sandbox_switch_to(next_sandbox); - } else { - current_sandbox_yield(); - }; -} diff --git a/runtime/include/current_sandbox_yield.h b/runtime/include/current_sandbox_yield.h deleted file mode 100644 index 40d3730..0000000 --- a/runtime/include/current_sandbox_yield.h +++ /dev/null @@ -1,45 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "arch/context.h" -#include "current_sandbox.h" -#include "sandbox_types.h" -#include "sandbox_exit.h" -#include "software_interrupt.h" - -/** - * @brief Switches to the base context, placing the current sandbox on the completion queue if in RETURNED state - */ -static inline void -current_sandbox_yield() -{ - assert(!software_interrupt_is_enabled()); - - struct sandbox *current_sandbox = current_sandbox_get(); -#ifndef NDEBUG - if (current_sandbox != NULL) { - assert(current_sandbox->state < SANDBOX_STATE_COUNT); - assert(current_sandbox->stack_size == current_sandbox->module->stack_size); - } -#endif - - /* Assumption: Base Context should never switch to Base Context */ - assert(current_sandbox != NULL); - struct arch_context *current_context = ¤t_sandbox->ctxt; - assert(current_context != &worker_thread_base_context); - -#ifdef LOG_CONTEXT_SWITCHES - debuglog("Sandbox %lu (@%p) (%s) > Base Context (@%p) (%s)\n", current_sandbox->id, current_context, - arch_context_variant_print(current_sandbox->ctxt.variant), &worker_thread_base_context, - arch_context_variant_print(worker_thread_base_context.variant)); -#endif - - sandbox_exit(current_sandbox); - current_sandbox_set(NULL); - assert(worker_thread_base_context.variant == ARCH_CONTEXT_VARIANT_FAST); - runtime_worker_threads_deadline[worker_thread_idx] = UINT64_MAX; - arch_context_switch(current_context, &worker_thread_base_context); -} diff --git a/runtime/include/local_runqueue.h b/runtime/include/local_runqueue.h index 4a24316..432eea1 100644 --- a/runtime/include/local_runqueue.h +++ b/runtime/include/local_runqueue.h @@ -9,14 +9,12 @@ typedef void (*local_runqueue_add_fn_t)(struct sandbox *); typedef bool (*local_runqueue_is_empty_fn_t)(void); typedef void (*local_runqueue_delete_fn_t)(struct sandbox *sandbox); typedef struct sandbox *(*local_runqueue_get_next_fn_t)(); -typedef void (*local_runqueue_preempt_fn_t)(ucontext_t *); struct local_runqueue_config { local_runqueue_add_fn_t add_fn; local_runqueue_is_empty_fn_t is_empty_fn; local_runqueue_delete_fn_t delete_fn; local_runqueue_get_next_fn_t get_next_fn; - local_runqueue_preempt_fn_t preempt_fn; }; void local_runqueue_add(struct sandbox *); @@ -24,4 +22,3 @@ void local_runqueue_delete(struct sandbox *); bool local_runqueue_is_empty(); struct sandbox *local_runqueue_get_next(); void local_runqueue_initialize(struct local_runqueue_config *config); -void local_runqueue_preempt(ucontext_t *); diff --git a/runtime/include/sandbox_receive_request.h b/runtime/include/sandbox_receive_request.h index 213c6c2..60c444e 100644 --- a/runtime/include/sandbox_receive_request.h +++ b/runtime/include/sandbox_receive_request.h @@ -8,13 +8,13 @@ #include #include "current_sandbox.h" -#include "current_sandbox_block.h" #include "debuglog.h" #include "http_parser.h" #include "http_request.h" #include "http_parser_settings.h" #include "likely.h" #include "sandbox_types.h" +#include "scheduler.h" /** * Receive and Parse the Request for the current sandbox @@ -44,7 +44,7 @@ sandbox_receive_request(struct sandbox *sandbox) if (recved < 0) { if (errno == EAGAIN) { - current_sandbox_block(); + scheduler_block(); continue; } else { /* All other errors */ diff --git a/runtime/include/sandbox_send_response.h b/runtime/include/sandbox_send_response.h index c095c17..4dfb4e3 100644 --- a/runtime/include/sandbox_send_response.h +++ b/runtime/include/sandbox_send_response.h @@ -9,11 +9,11 @@ #include #include "current_sandbox.h" -#include "current_sandbox_block.h" #include "http.h" #include "http_total.h" #include "likely.h" #include "sandbox_types.h" +#include "scheduler.h" #include "panic.h" /** @@ -105,7 +105,7 @@ sandbox_send_response(struct sandbox *sandbox) response_cursor - sent); if (rc < 0) { if (errno == EAGAIN) - current_sandbox_block(); + scheduler_block(); else { perror("write"); return -1; diff --git a/runtime/include/scheduler.h b/runtime/include/scheduler.h new file mode 100644 index 0000000..ea9f3c0 --- /dev/null +++ b/runtime/include/scheduler.h @@ -0,0 +1,198 @@ +#pragma once + +#include +#include +#include + +#include "client_socket.h" +#include "global_request_scheduler.h" +#include "local_runqueue.h" +#include "sandbox_request.h" +#include "sandbox_exit.h" +#include "sandbox_functions.h" +#include "sandbox_types.h" +#include "sandbox_set_as_blocked.h" +#include "sandbox_set_as_preempted.h" +#include "sandbox_set_as_runnable.h" +#include "sandbox_set_as_running.h" +#include "worker_thread_execute_epoll_loop.h" + +static inline struct sandbox * +scheduler_get_next() +{ + assert(!software_interrupt_is_enabled()); + + /* Get the deadline of the sandbox at the head of the local request queue */ + struct sandbox *local = local_runqueue_get_next(); + uint64_t local_deadline = local == NULL ? UINT64_MAX : local->absolute_deadline; + + uint64_t global_deadline = global_request_scheduler_peek(); + + /* Try to pull and allocate from the global queue if earlier + * This will be placed at the head of the local runqueue */ + if (global_deadline < local_deadline) { + struct sandbox_request *request = NULL; + int return_code = global_request_scheduler_remove_if_earlier(&request, local_deadline); + if (return_code == 0) { + assert(request != NULL); + assert(request->absolute_deadline < local_deadline); + struct sandbox *global = sandbox_allocate(request); + if (!global) { + client_socket_send(request->socket_descriptor, 503); + client_socket_close(request->socket_descriptor, &request->socket_address); + free(request); + debuglog("scheduler failed to allocate sandbox\n"); + } else { + assert(global->state == SANDBOX_INITIALIZED); + sandbox_set_as_runnable(global, SANDBOX_INITIALIZED); + } + } + } + + /* Return what is at the head of the local runqueue or NULL if empty */ + return local_runqueue_get_next(); +} + +/** + * Called by the SIGALRM handler after a quantum + * Assumes the caller validates that there is something to preempt + * @param user_context - The context of our user-level Worker thread + */ +static inline void +scheduler_preempt(ucontext_t *user_context) +{ + assert(user_context != NULL); + assert(!software_interrupt_is_enabled()); + + /* Process epoll to make sure that all runnable jobs are considered for execution */ + worker_thread_execute_epoll_loop(); + + struct sandbox *current = current_sandbox_get(); + assert(current != NULL); + assert(current->state == SANDBOX_RUNNING); + + struct sandbox *next = scheduler_get_next(); + assert(next != NULL); + + /* If current equals return, we are already running earliest deadline, so resume execution */ + if (current == next) return; + + /* Save the context of the currently executing sandbox before switching from it */ + sandbox_set_as_preempted(current, SANDBOX_RUNNING); + arch_mcontext_save(¤t->ctxt, &user_context->uc_mcontext); + + /* Update current_sandbox to the next sandbox */ + assert(next->state == SANDBOX_RUNNABLE); + sandbox_set_as_running(next, SANDBOX_RUNNABLE); + + /* Update the current deadline of the worker thread */ + runtime_worker_threads_deadline[worker_thread_idx] = next->absolute_deadline; + + /* Restore the context of this sandbox */ + arch_context_restore_new(&user_context->uc_mcontext, &next->ctxt); +} + +/** + * @brief Switches to the next sandbox, placing the current sandbox on the completion queue if in SANDBOX_RETURNED state + * @param next_sandbox The Sandbox Context to switch to + */ +static inline void +scheduler_switch_to(struct sandbox *next_sandbox) +{ + /* Assumption: The caller disables interrupts */ + assert(!software_interrupt_is_enabled()); + + assert(next_sandbox != NULL); + struct arch_context *next_context = &next_sandbox->ctxt; + + /* Get the old sandbox we're switching from. + * This is null if switching from base context + */ + struct sandbox * current_sandbox = current_sandbox_get(); + struct arch_context *current_context = NULL; + if (current_sandbox != NULL) current_context = ¤t_sandbox->ctxt; + + assert(next_sandbox != current_sandbox); + + /* Update the worker's absolute deadline */ + runtime_worker_threads_deadline[worker_thread_idx] = next_sandbox->absolute_deadline; + + if (current_sandbox == NULL) { + /* Switching from "Base Context" */ +#ifdef LOG_CONTEXT_SWITCHES + debuglog("Base Context (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", &worker_thread_base_context, + arch_context_variant_print(worker_thread_base_context.variant), next_sandbox->id, next_context, + arch_context_variant_print(next_context->variant)); +#endif + } else { +#ifdef LOG_CONTEXT_SWITCHES + debuglog("Sandbox %lu (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", current_sandbox->id, + ¤t_sandbox->ctxt, arch_context_variant_print(current_sandbox->ctxt.variant), + next_sandbox->id, &next_sandbox->ctxt, arch_context_variant_print(next_context->variant)); +#endif + + sandbox_exit(current_sandbox); + } + + sandbox_set_as_running(next_sandbox, next_sandbox->state); + arch_context_switch(current_context, next_context); +} + + +/** + * @brief Switches to the base context, placing the current sandbox on the completion queue if in RETURNED state + */ +static inline void +scheduler_yield() +{ + assert(!software_interrupt_is_enabled()); + + struct sandbox *current_sandbox = current_sandbox_get(); +#ifndef NDEBUG + if (current_sandbox != NULL) { + assert(current_sandbox->state < SANDBOX_STATE_COUNT); + assert(current_sandbox->stack_size == current_sandbox->module->stack_size); + } +#endif + + /* Assumption: Base Context should never switch to Base Context */ + assert(current_sandbox != NULL); + struct arch_context *current_context = ¤t_sandbox->ctxt; + assert(current_context != &worker_thread_base_context); + +#ifdef LOG_CONTEXT_SWITCHES + debuglog("Sandbox %lu (@%p) (%s) > Base Context (@%p) (%s)\n", current_sandbox->id, current_context, + arch_context_variant_print(current_sandbox->ctxt.variant), &worker_thread_base_context, + arch_context_variant_print(worker_thread_base_context.variant)); +#endif + + sandbox_exit(current_sandbox); + current_sandbox_set(NULL); + assert(worker_thread_base_context.variant == ARCH_CONTEXT_VARIANT_FAST); + runtime_worker_threads_deadline[worker_thread_idx] = UINT64_MAX; + arch_context_switch(current_context, &worker_thread_base_context); +} + +/** + * Mark the currently executing sandbox as blocked, remove it from the local runqueue, + * and switch to base context + */ +static inline void +scheduler_block(void) +{ + /* Remove the sandbox we were just executing from the runqueue and mark as blocked */ + struct sandbox *current_sandbox = current_sandbox_get(); + + /* We might either have blocked in start reading the request or while executing within the WebAssembly + * entrypoint. The preemptable flag on the context is used to differentiate. In either case, we should + * have disabled interrupts. + */ + if (current_sandbox->ctxt.preemptable) software_interrupt_disable(); + assert(!software_interrupt_is_enabled()); + + assert(current_sandbox->state == SANDBOX_RUNNING); + sandbox_set_as_blocked(current_sandbox, SANDBOX_RUNNING); + generic_thread_dump_lock_overhead(); + + scheduler_yield(); +} diff --git a/runtime/include/worker_thread_execute_epoll_loop.h b/runtime/include/worker_thread_execute_epoll_loop.h new file mode 100644 index 0000000..e6b6c70 --- /dev/null +++ b/runtime/include/worker_thread_execute_epoll_loop.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include + +#include "client_socket.h" +#include "panic.h" +#include "runtime.h" +#include "sandbox_functions.h" +#include "sandbox_set_as_error.h" +#include "sandbox_set_as_runnable.h" +#include "sandbox_state.h" +#include "sandbox_types.h" +#include "software_interrupt.h" +#include "worker_thread.h" + + +/** + * Run all outstanding events in the local thread's epoll loop + */ +static inline void +worker_thread_execute_epoll_loop(void) +{ + assert(!software_interrupt_is_enabled()); + while (true) { + struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS]; + int descriptor_count = epoll_wait(worker_thread_epoll_file_descriptor, epoll_events, + RUNTIME_MAX_EPOLL_EVENTS, 0); + + if (descriptor_count < 0) { + if (errno == EINTR) continue; + + panic_err(); + } + + if (descriptor_count == 0) break; + + for (int i = 0; i < descriptor_count; i++) { + if (epoll_events[i].events & (EPOLLIN | EPOLLOUT)) { + /* Re-add to runqueue if blocked */ + struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr; + assert(sandbox); + + if (sandbox->state == SANDBOX_BLOCKED) { + sandbox_set_as_runnable(sandbox, SANDBOX_BLOCKED); + } + } else if (epoll_events[i].events & (EPOLLERR | EPOLLHUP)) { + /* Mystery: This seems to never fire. Why? Issue #130 */ + + /* Close socket and set as error on socket error or unexpected client hangup */ + struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr; + int error = 0; + socklen_t errlen = sizeof(error); + getsockopt(epoll_events[i].data.fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + + if (error > 0) { + debuglog("Socket error: %s", strerror(error)); + } else if (epoll_events[i].events & EPOLLHUP) { + debuglog("Client Hungup"); + } else { + debuglog("Unknown Socket error"); + } + + switch (sandbox->state) { + case SANDBOX_SET_AS_RETURNED: + case SANDBOX_RETURNED: + case SANDBOX_SET_AS_COMPLETE: + case SANDBOX_COMPLETE: + case SANDBOX_SET_AS_ERROR: + case SANDBOX_ERROR: + panic("Expected to have closed socket"); + default: + client_socket_send(sandbox->client_socket_descriptor, 503); + sandbox_close_http(sandbox); + sandbox_set_as_error(sandbox, sandbox->state); + } + } else { + panic("Mystery epoll event!\n"); + }; + } + } +} diff --git a/runtime/src/current_sandbox.c b/runtime/src/current_sandbox.c index f88bd5e..385ae6c 100644 --- a/runtime/src/current_sandbox.c +++ b/runtime/src/current_sandbox.c @@ -1,11 +1,11 @@ #include "current_sandbox.h" -#include "current_sandbox_yield.h" #include "sandbox_functions.h" #include "sandbox_receive_request.h" #include "sandbox_send_response.h" #include "sandbox_set_as_error.h" #include "sandbox_set_as_returned.h" #include "sandbox_setup_arguments.h" +#include "scheduler.h" // /* current sandbox that is active.. */ __thread struct sandbox *worker_thread_current_sandbox = NULL; @@ -95,7 +95,7 @@ current_sandbox_start(void) done: /* Cleanup connection and exit sandbox */ generic_thread_dump_lock_overhead(); - current_sandbox_yield(); + scheduler_yield(); /* This assert prevents a segfault discussed in * https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66 diff --git a/runtime/src/libc/syscall.c b/runtime/src/libc/syscall.c index e6a5827..ba84390 100644 --- a/runtime/src/libc/syscall.c +++ b/runtime/src/libc/syscall.c @@ -11,7 +11,7 @@ #include #include "current_sandbox.h" -#include "current_sandbox_block.h" +#include "scheduler.h" #include "sandbox_functions.h" #include "worker_thread.h" @@ -114,7 +114,7 @@ wasm_read(int32_t filedes, int32_t buf_offset, int32_t nbyte) int32_t length_read = (int32_t)read(filedes, buf, nbyte); if (length_read < 0) { if (errno == EAGAIN) - current_sandbox_block(); + scheduler_block(); else { /* All other errors */ debuglog("Error reading socket %d - %s\n", filedes, strerror(errno)); @@ -157,7 +157,7 @@ wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size) int32_t length_written = (int32_t)write(f, buf, buf_size); if (length_written < 0) { if (errno == EAGAIN) - current_sandbox_block(); + scheduler_block(); else { /* All other errors */ debuglog("Error reading socket %d - %s\n", fd, strerror(errno)); diff --git a/runtime/src/local_runqueue.c b/runtime/src/local_runqueue.c index b28c5cd..b87f66e 100644 --- a/runtime/src/local_runqueue.c +++ b/runtime/src/local_runqueue.c @@ -67,14 +67,3 @@ local_runqueue_get_next() assert(local_runqueue.get_next_fn != NULL); return local_runqueue.get_next_fn(); }; - -/** - * Preempt the current sandbox according to the scheduler variant - * @param context - */ -void -local_runqueue_preempt(ucontext_t *context) -{ - assert(local_runqueue.preempt_fn != NULL); - return local_runqueue.preempt_fn(context); -}; diff --git a/runtime/src/local_runqueue_list.c b/runtime/src/local_runqueue_list.c index 0fb0343..a947fc9 100644 --- a/runtime/src/local_runqueue_list.c +++ b/runtime/src/local_runqueue_list.c @@ -88,17 +88,6 @@ local_runqueue_list_append(struct sandbox *sandbox_to_append) ps_list_head_append_d(&local_runqueue_list, sandbox_to_append); } -/** - * Conditionally checks to see if current sandbox should be preempted. - * FIFO doesn't preempt, so just return. - */ -void -local_runqueue_list_preempt(ucontext_t *user_context) -{ - return; -} - - void local_runqueue_list_initialize() { @@ -108,7 +97,6 @@ local_runqueue_list_initialize() struct local_runqueue_config config = { .add_fn = local_runqueue_list_append, .is_empty_fn = local_runqueue_list_is_empty, .delete_fn = local_runqueue_list_remove, - .get_next_fn = local_runqueue_list_get_next, - .preempt_fn = local_runqueue_list_preempt }; + .get_next_fn = local_runqueue_list_get_next }; local_runqueue_initialize(&config); }; diff --git a/runtime/src/local_runqueue_minheap.c b/runtime/src/local_runqueue_minheap.c index f01c6cb..25cd6f5 100644 --- a/runtime/src/local_runqueue_minheap.c +++ b/runtime/src/local_runqueue_minheap.c @@ -70,121 +70,13 @@ local_runqueue_minheap_get_next() { assert(!software_interrupt_is_enabled()); - struct sandbox * sandbox = NULL; - struct sandbox_request *sandbox_request = NULL; - int sandbox_rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&sandbox); + /* Get the deadline of the sandbox at the head of the local request queue */ + struct sandbox *next = NULL; + int rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&next); - while (sandbox_rc == -ENOENT && global_request_scheduler_peek() < ULONG_MAX && sandbox == NULL) { - /* local runqueue empty, try to pull a sandbox request */ - if (global_request_scheduler_remove(&sandbox_request) < 0) { - /* Assumption: Sandbox request should not be set in case of an error */ - assert(sandbox_request == NULL); - goto done; - } + if (rc == -ENOENT) return NULL; - /* Try to allocate a sandbox. Try again on failure */ - sandbox = sandbox_allocate(sandbox_request); - if (!sandbox) { - client_socket_send(sandbox_request->socket_descriptor, 503); - client_socket_close(sandbox_request->socket_descriptor, &sandbox->client_address); - free(sandbox_request); - continue; - }; - - assert(sandbox->state == SANDBOX_INITIALIZED); - sandbox_set_as_runnable(sandbox, SANDBOX_INITIALIZED); - } - -done: - return sandbox; -err: - sandbox = NULL; - goto done; -} - - -/** - * Called by the SIGALRM handler after a quantum - * Assumes the caller validates that there is something to preempt - * @param user_context - The context of our user-level Worker thread - */ -void -local_runqueue_minheap_preempt(ucontext_t *user_context) -{ - assert(user_context != NULL); - assert(!software_interrupt_is_enabled()); - - struct sandbox *current_sandbox = current_sandbox_get(); - - /* If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. */ - if (current_sandbox == NULL) return; - - /* The current sandbox should be the head of the runqueue */ - assert(local_runqueue_minheap_is_empty() == false); - - uint64_t local_deadline = priority_queue_peek(local_runqueue_minheap); - uint64_t global_deadline = global_request_scheduler_peek(); - /* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */ - struct sandbox_request *sandbox_request = NULL; - if (global_deadline < local_deadline) { -#ifdef LOG_PREEMPTION - debuglog("Sandbox %lu has deadline of %lu. Trying to preempt for request with %lu\n", - current_sandbox->id, local_deadline, global_deadline); -#endif - - int return_code = global_request_scheduler_remove_if_earlier(&sandbox_request, local_deadline); - - /* If we were unable to get a sandbox_request, exit */ - if (return_code != 0) { -#ifdef LOG_PREEMPTION - debuglog("Preemption aborted. Another thread took the request\n"); -#endif - /* Assumption: Sandbox request should not be set in case of an error */ - assert(sandbox_request == NULL); - goto done; - } - - assert(sandbox_request->absolute_deadline < local_deadline); - -#ifdef LOG_PREEMPTION - debuglog("Preempted %lu for %lu\n", local_deadline, sandbox_request->absolute_deadline); -#endif - - /* Allocate the request */ - struct sandbox *next_sandbox = sandbox_allocate(sandbox_request); - if (!next_sandbox) goto err_sandbox_allocate; - - /* Set as runnable and add it to the runqueue */ - assert(next_sandbox->state == SANDBOX_INITIALIZED); - sandbox_set_as_runnable(next_sandbox, SANDBOX_INITIALIZED); - - assert(current_sandbox->state == SANDBOX_RUNNING); - sandbox_set_as_preempted(current_sandbox, SANDBOX_RUNNING); - - /* Save the context of the currently executing sandbox before switching from it */ - arch_mcontext_save(¤t_sandbox->ctxt, &user_context->uc_mcontext); - - /* Update current_sandbox to the next sandbox */ - assert(next_sandbox->state == SANDBOX_RUNNABLE); - sandbox_set_as_running(next_sandbox, SANDBOX_RUNNABLE); - - /* - * Restore the context of this new sandbox - * user-level context switch state, so do not enable software interrupts. - * TODO: Review the interrupt logic here. Issue #63 - */ - runtime_worker_threads_deadline[worker_thread_idx] = next_sandbox->absolute_deadline; - assert(!software_interrupt_is_enabled()); - arch_context_restore_new(&user_context->uc_mcontext, &next_sandbox->ctxt); - } -done: - return; -err_sandbox_allocate: - client_socket_send(sandbox_request->socket_descriptor, 503); - client_socket_close(sandbox_request->socket_descriptor, &sandbox_request->socket_address); - debuglog("local_runqueue_minheap_preempt failed to allocate sandbox\n"); -err: - goto done; + return next; } /** @@ -201,8 +93,7 @@ local_runqueue_minheap_initialize() struct local_runqueue_config config = { .add_fn = local_runqueue_minheap_add, .is_empty_fn = local_runqueue_minheap_is_empty, .delete_fn = local_runqueue_minheap_delete, - .get_next_fn = local_runqueue_minheap_get_next, - .preempt_fn = local_runqueue_minheap_preempt }; + .get_next_fn = local_runqueue_minheap_get_next }; local_runqueue_initialize(&config); } diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index b11a948..db0ca57 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -1,25 +1,12 @@ -#include #include -#include -#include #include -#include "admissions_control.h" #include "current_sandbox.h" #include "debuglog.h" -#include "http_parser_settings.h" -#include "http_total.h" -#include "local_completion_queue.h" -#include "local_runqueue.h" -#include "likely.h" #include "panic.h" -#include "runtime.h" -#include "sandbox_exit.h" #include "sandbox_functions.h" #include "sandbox_set_as_error.h" #include "sandbox_set_as_initialized.h" -#include "sandbox_set_as_running.h" -#include "worker_thread.h" /** * Close the sandbox's ith io_handle @@ -240,49 +227,3 @@ err_free_stack_failed: /* Errors freeing memory is a fatal error */ panic("Failed to free Sandbox %lu\n", sandbox->id); } - -/** - * @brief Switches to the next sandbox, placing the current sandbox on the completion queue if in SANDBOX_RETURNED state - * @param next_sandbox The Sandbox Context to switch to - */ -void -sandbox_switch_to(struct sandbox *next_sandbox) -{ - /* Assumption: The caller disables interrupts */ - assert(!software_interrupt_is_enabled()); - - assert(next_sandbox != NULL); - struct arch_context *next_context = &next_sandbox->ctxt; - - /* Get the old sandbox we're switching from. - * This is null if switching from base context - */ - struct sandbox * current_sandbox = current_sandbox_get(); - struct arch_context *current_context = NULL; - if (current_sandbox != NULL) current_context = ¤t_sandbox->ctxt; - - assert(next_sandbox != current_sandbox); - - /* Update the worker's absolute deadline */ - runtime_worker_threads_deadline[worker_thread_idx] = next_sandbox->absolute_deadline; - - if (current_sandbox == NULL) { - /* Switching from "Base Context" */ -#ifdef LOG_CONTEXT_SWITCHES - debuglog("Base Context (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", &worker_thread_base_context, - arch_context_variant_print(worker_thread_base_context.variant), next_sandbox->id, next_context, - arch_context_variant_print(next_context->variant)); -#endif - } else { -#ifdef LOG_CONTEXT_SWITCHES - debuglog("Sandbox %lu (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", current_sandbox->id, - ¤t_sandbox->ctxt, arch_context_variant_print(current_sandbox->ctxt.variant), - next_sandbox->id, &next_sandbox->ctxt, arch_context_variant_print(next_context->variant)); -#endif - - sandbox_exit(current_sandbox); - } - - sandbox_set_as_running(next_sandbox, next_sandbox->state); - arch_context_switch(current_context, next_context); -} diff --git a/runtime/src/software_interrupt.c b/runtime/src/software_interrupt.c index d3c962b..01fe89a 100644 --- a/runtime/src/software_interrupt.c +++ b/runtime/src/software_interrupt.c @@ -18,6 +18,7 @@ #include "panic.h" #include "runtime.h" #include "sandbox_types.h" +#include "scheduler.h" #include "software_interrupt.h" /******************* @@ -124,11 +125,7 @@ sigalrm_handler(siginfo_t *signal_info, ucontext_t *user_context, struct sandbox assert(current_sandbox->state != SANDBOX_RETURNED); /* Preempt */ - local_runqueue_preempt(user_context); - - /* We have to call current_sandbox_get because the argument potentially points to what - * was just preempted */ - if (current_sandbox_get()->ctxt.preemptable) software_interrupt_enable(); + scheduler_preempt(user_context); return; } @@ -204,6 +201,7 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void break; } case SIGUSR1: { + assert(!software_interrupt_is_enabled()); sigusr1_handler(signal_info, user_context, current_sandbox); break; } @@ -220,6 +218,12 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void } } atomic_fetch_sub(&software_interrupt_signal_depth, 1); + + /* Reenable software interrupts if we restored a preemptable sandbox + * We explicitly call current_sandbox_get becaue it might have been changed by a handler + */ + current_sandbox = current_sandbox_get(); + if (current_sandbox && current_sandbox->ctxt.preemptable) software_interrupt_enable(); } /******************** diff --git a/runtime/src/worker_thread.c b/runtime/src/worker_thread.c index 7c54945..2404905 100644 --- a/runtime/src/worker_thread.c +++ b/runtime/src/worker_thread.c @@ -4,22 +4,17 @@ #include #include #include -#include -#include "client_socket.h" #include "current_sandbox.h" -#include "debuglog.h" -#include "global_request_scheduler.h" #include "local_completion_queue.h" #include "local_runqueue.h" #include "local_runqueue_list.h" #include "local_runqueue_minheap.h" #include "panic.h" #include "runtime.h" -#include "sandbox_functions.h" -#include "sandbox_set_as_runnable.h" -#include "sandbox_set_as_error.h" +#include "scheduler.h" #include "worker_thread.h" +#include "worker_thread_execute_epoll_loop.h" /*************************** * Worker Thread State * @@ -37,72 +32,6 @@ __thread int worker_thread_idx; * Worker Thread Logic * **********************/ -/** - * Run all outstanding events in the local thread's epoll loop - */ -static inline void -worker_thread_execute_epoll_loop(void) -{ - assert(software_interrupt_is_disabled); - while (true) { - struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS]; - int descriptor_count = epoll_wait(worker_thread_epoll_file_descriptor, epoll_events, - RUNTIME_MAX_EPOLL_EVENTS, 0); - - if (descriptor_count < 0) { - if (errno == EINTR) continue; - - panic_err(); - } - - if (descriptor_count == 0) break; - - for (int i = 0; i < descriptor_count; i++) { - if (epoll_events[i].events & (EPOLLIN | EPOLLOUT)) { - /* Re-add to runqueue if blocked */ - struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr; - assert(sandbox); - - if (sandbox->state == SANDBOX_BLOCKED) { - sandbox_set_as_runnable(sandbox, SANDBOX_BLOCKED); - } - } else if (epoll_events[i].events & (EPOLLERR | EPOLLHUP)) { - /* Mystery: This seems to never fire. Why? Issue #130 */ - - /* Close socket and set as error on socket error or unexpected client hangup */ - struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr; - int error = 0; - socklen_t errlen = sizeof(error); - getsockopt(epoll_events[i].data.fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - - if (error > 0) { - debuglog("Socket error: %s", strerror(error)); - } else if (epoll_events[i].events & EPOLLHUP) { - debuglog("Client Hungup"); - } else { - debuglog("Unknown Socket error"); - } - - switch (sandbox->state) { - case SANDBOX_SET_AS_RETURNED: - case SANDBOX_RETURNED: - case SANDBOX_SET_AS_COMPLETE: - case SANDBOX_COMPLETE: - case SANDBOX_SET_AS_ERROR: - case SANDBOX_ERROR: - panic("Expected to have closed socket"); - default: - client_socket_send(sandbox->client_socket_descriptor, 503); - sandbox_close_http(sandbox); - sandbox_set_as_error(sandbox, sandbox->state); - } - } else { - panic("Mystery epoll event!\n"); - }; - } - } -} - /** * The entry function for sandbox worker threads * Initializes thread-local state, unmasks signals, sets up epoll loop and @@ -159,8 +88,8 @@ worker_thread_main(void *argument) worker_thread_execute_epoll_loop(); /* Switch to a sandbox if one is ready to run */ - next_sandbox = local_runqueue_get_next(); - if (next_sandbox != NULL) { sandbox_switch_to(next_sandbox); } + next_sandbox = scheduler_get_next(); + if (next_sandbox != NULL) { scheduler_switch_to(next_sandbox); } assert(!software_interrupt_is_enabled()); /* Clear the completion queue */