From 3caecadefe1a29e1663b387bed5f2c74f1770575 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Sat, 2 May 2020 17:13:30 -0400 Subject: [PATCH] feat: Implement peek in ps --- runtime/include/arch/x86_64/context.h | 59 +++++++++++++++---------- runtime/include/sandbox.h | 7 ++- runtime/include/sandbox_run_queue.h | 4 ++ runtime/include/software_interrupt.h | 1 + runtime/src/main.c | 4 +- runtime/src/runtime.c | 4 +- runtime/src/sandbox.c | 2 +- runtime/src/sandbox_request_scheduler.c | 7 +++ runtime/src/sandbox_run_queue.c | 9 +++- runtime/src/sandbox_run_queue_fifo.c | 33 +++++++++++++- runtime/src/sandbox_run_queue_ps.c | 59 ++++++++++++++++++++++++- runtime/src/software_interrupt.c | 24 +++++----- runtime/src/worker_thread.c | 58 +++--------------------- 13 files changed, 173 insertions(+), 98 deletions(-) diff --git a/runtime/include/arch/x86_64/context.h b/runtime/include/arch/x86_64/context.h index 83fd706..28e9509 100644 --- a/runtime/include/arch/x86_64/context.h +++ b/runtime/include/arch/x86_64/context.h @@ -5,6 +5,7 @@ #include #include #include +#include #define ARCH_NREGS (16 /* GP registers */ + 1 /* for IP */) #define ARCH_SIG_JMP_OFF 8 @@ -74,27 +75,38 @@ static void __attribute__((noinline)) arch_context_init(arch_context_t *actx, re *(actx->regs + 16) = ip; } +/** + * Preempt the current sandbox and start executing the next sandbox + * @param mc - the context of the current thread of execution + * @param ctx - the context that we want to restore + * @return {0,1} 0 = context restored successfully. 1 = special processing because thread was last in a user-level + * context switch state + **/ static int arch_mcontext_restore(mcontext_t *mc, arch_context_t *ctx) { assert(ctx != &worker_thread_base_context); - // if ctx->regs[5] is set, this was last in a user-level context switch state! - // else restore mcontext.. - if (ctx->regs[5]) { + // if ctx->regs[5] is set, this was last in a user-level context switch state + bool did_user_level_context_switch = ctx->regs[5]; + if (did_user_level_context_switch) { mc->gregs[REG_RSP] = ctx->regs[5]; mc->gregs[REG_RIP] = ctx->regs[16] + ARCH_SIG_JMP_OFF; ctx->regs[5] = 0; - return 1; } else { + // Restore mcontext memcpy(mc, &ctx->mctx, sizeof(mcontext_t)); memset(&ctx->mctx, 0, sizeof(mcontext_t)); + return 0; } - - return 0; } +/** + * Save the context of the currently executing process + * @param ctx - destination + * @param mc - source + **/ static void arch_mcontext_save(arch_context_t *ctx, mcontext_t *mc) { @@ -104,27 +116,26 @@ arch_mcontext_save(arch_context_t *ctx, mcontext_t *mc) memcpy(&ctx->mctx, mc, sizeof(mcontext_t)); } - +/** + * @param current - the registers and context of the thing running + * @param next - the registers and context of what we're switching to + * @return always returns 0, indicating success + * + * NULL in either of these values indicates the "no sandbox to execute" state, + * which defaults to resuming execution of main + **/ static inline int -arch_context_switch(arch_context_t *ca, arch_context_t *na) +arch_context_switch(arch_context_t *current, arch_context_t *next) { - if (!ca) { - assert(na); - // switching from "no sandbox to execute" state to "executing a sandbox" - ca = &worker_thread_base_context; - } else if (!na) { - assert(ca); - - // switching from "executing a sandbox" to "no execution" state. - na = &worker_thread_base_context; - } else { - assert(na && ca); + // if both current and next are NULL, there is no state change + assert(current != NULL || next != NULL); - // switching between sandboxes. - } + // Set any NULLs to worker_thread_base_context to resume execution of main + if (current == NULL) current = &worker_thread_base_context; + if (next == NULL) next = &worker_thread_base_context; - reg_t *cr = ca->regs, *nr = na->regs; - assert(cr && nr); + reg_t *current_registers = current->regs, *next_registers = next->regs; + assert(current_registers && next_registers); asm volatile("pushq %%rbp\n\t" "movq %%rsp, %%rbp\n\t" @@ -143,7 +154,7 @@ arch_context_switch(arch_context_t *ca, arch_context_t *na) "3:\n\t" "popq %%rbp\n\t" : - : "a"(cr), "b"(nr) + : "a"(current_registers), "b"(next_registers) : "memory", "cc", "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", "xmm0", "xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7", "xmm8", "xmm9", "xmm10", "xmm11", "xmm12", "xmm13", "xmm14", "xmm15"); diff --git a/runtime/include/sandbox.h b/runtime/include/sandbox.h index 1b99a38..fe6a57e 100644 --- a/runtime/include/sandbox.h +++ b/runtime/include/sandbox.h @@ -86,10 +86,9 @@ typedef struct sandbox sandbox_t; extern __thread arch_context_t *worker_thread_next_context; -extern void worker_thread_block_current_sandbox(void); -extern void worker_thread_exit_current_sandbox(void); -extern struct sandbox *worker_thread_get_next_sandbox(void); -extern void worker_thread_process_io(void); +extern void worker_thread_block_current_sandbox(void); +extern void worker_thread_exit_current_sandbox(void); +extern void worker_thread_process_io(void); extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void); extern void worker_thread_wakeup_sandbox(sandbox_t *sandbox); diff --git a/runtime/include/sandbox_run_queue.h b/runtime/include/sandbox_run_queue.h index 873ae45..26d1a25 100644 --- a/runtime/include/sandbox_run_queue.h +++ b/runtime/include/sandbox_run_queue.h @@ -9,12 +9,14 @@ typedef struct sandbox *(*sandbox_run_queue_add_t)(struct sandbox *); typedef struct sandbox *(*sandbox_run_queue_remove_t)(void); typedef bool (*sandbox_run_queue_is_empty_t)(void); typedef void (*sandbox_run_queue_delete_t)(struct sandbox *sandbox); +typedef struct sandbox *(*sandbox_run_queue_get_next_t)(); typedef struct sandbox_run_queue_config_t { sandbox_run_queue_add_t add; sandbox_run_queue_is_empty_t is_empty; sandbox_run_queue_remove_t remove; sandbox_run_queue_delete_t delete; + sandbox_run_queue_get_next_t get_next; } sandbox_run_queue_config_t; @@ -24,5 +26,7 @@ struct sandbox *sandbox_run_queue_add(struct sandbox *); void sandbox_run_queue_delete(struct sandbox *); struct sandbox *sandbox_run_queue_remove(); bool sandbox_run_queue_is_empty(); +struct sandbox *sandbox_run_queue_get_next(); + #endif /* SFRT_SANDBOX_RUN_QUEUE_H */ \ No newline at end of file diff --git a/runtime/include/software_interrupt.h b/runtime/include/software_interrupt.h index ea8d60f..f47b035 100644 --- a/runtime/include/software_interrupt.h +++ b/runtime/include/software_interrupt.h @@ -10,6 +10,7 @@ ***************************************/ extern __thread volatile sig_atomic_t software_interrupt_is_disabled; +extern uint64_t SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES; /*************************************** * Public Static Inlines diff --git a/runtime/src/main.c b/runtime/src/main.c index 2f67d47..ed66fca 100644 --- a/runtime/src/main.c +++ b/runtime/src/main.c @@ -197,7 +197,9 @@ main(int argc, char **argv) memset(runtime_worker_threads, 0, sizeof(pthread_t) * WORKER_THREAD_CORE_COUNT); - runtime_processor_speed_MHz = runtime_get_processor_speed_MHz(); + runtime_processor_speed_MHz = runtime_get_processor_speed_MHz(); + SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES = (uint64_t)SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_USEC + * runtime_processor_speed_MHz; printf("Detected processor speed of %f MHz\n", runtime_processor_speed_MHz); runtime_set_resource_limits_to_max(); diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index d49c5d1..7d79bc0 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -44,8 +44,8 @@ runtime_initialize(void) assert(runtime_epoll_file_descriptor >= 0); // Allocate and Initialize the global deque - // sandbox_request_scheduler_fifo_initialize(); - sandbox_request_scheduler_ps_initialize(); + sandbox_request_scheduler_fifo_initialize(); + // sandbox_request_scheduler_ps_initialize(); // Mask Signals software_interrupt_mask_signal(SIGUSR1); diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index c3f2295..108e895 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -191,7 +191,7 @@ current_sandbox_main(void) // FIXME: is this right? this is the first time this sandbox is running.. so it wont // return to worker_thread_switch_to_sandbox() api.. // we'd potentially do what we'd in worker_thread_switch_to_sandbox() api here for cleanup.. - if (!software_interrupt_is_enabled()) { + if (software_interrupt_is_enabled() == false) { arch_context_init(¤t_sandbox->ctxt, 0, 0); worker_thread_next_context = NULL; software_interrupt_enable(); diff --git a/runtime/src/sandbox_request_scheduler.c b/runtime/src/sandbox_request_scheduler.c index 56caba4..aa04ce7 100644 --- a/runtime/src/sandbox_request_scheduler.c +++ b/runtime/src/sandbox_request_scheduler.c @@ -26,3 +26,10 @@ sandbox_request_scheduler_remove() assert(sandbox_request_scheduler.remove != NULL); return sandbox_request_scheduler.remove(); } + +uint64_t +sandbox_request_scheduler_peek() +{ + assert(sandbox_request_scheduler.peek != NULL); + return sandbox_request_scheduler.peek(); +}; \ No newline at end of file diff --git a/runtime/src/sandbox_run_queue.c b/runtime/src/sandbox_run_queue.c index 534125e..d21f848 100644 --- a/runtime/src/sandbox_run_queue.c +++ b/runtime/src/sandbox_run_queue.c @@ -37,4 +37,11 @@ bool sandbox_run_queue_is_empty() { return sandbox_run_queue_is_empty(); -} \ No newline at end of file +} + +struct sandbox * +sandbox_run_queue_get_next() +{ + assert(sandbox_run_queue.get_next != NULL); + return sandbox_run_queue.get_next(); +}; \ No newline at end of file diff --git a/runtime/src/sandbox_run_queue_fifo.c b/runtime/src/sandbox_run_queue_fifo.c index a8424f8..9074621 100644 --- a/runtime/src/sandbox_run_queue_fifo.c +++ b/runtime/src/sandbox_run_queue_fifo.c @@ -1,5 +1,6 @@ #include "sandbox_run_queue_fifo.h" #include "sandbox_run_queue.h" +#include "sandbox_request_scheduler.h" __thread static struct ps_list_head sandbox_run_queue_fifo; @@ -26,6 +27,35 @@ sandbox_run_queue_fifo_remove(struct sandbox *sandbox_to_remove) ps_list_rem_d(sandbox_to_remove); } +/** + * Execute the sandbox at the head of the thread local runqueue + * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head + * @return the sandbox to execute or NULL if none are available + **/ +struct sandbox * +sandbox_run_queue_fifo_get_next() +{ + if (sandbox_run_queue_is_empty()) { + sandbox_request_t *sandbox_request = sandbox_request_scheduler_remove(); + if (sandbox_request == NULL) return NULL; + struct sandbox *sandbox = sandbox_allocate(sandbox_request); + assert(sandbox); + free(sandbox_request); + sandbox->state = RUNNABLE; + sandbox_run_queue_add(sandbox); + return sandbox; + } + + // Execute Round Robin Scheduling Logic + struct sandbox *next_sandbox = sandbox_run_queue_remove(); + assert(next_sandbox->state != RETURNED); + sandbox_run_queue_add(next_sandbox); + + debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name); + return next_sandbox; +} + + // Append a sandbox to the runqueue struct sandbox * sandbox_run_queue_fifo_append(struct sandbox *sandbox_to_append) @@ -55,7 +85,8 @@ sandbox_run_queue_fifo_initialize() sandbox_run_queue_config_t config = { .add = sandbox_run_queue_fifo_append, .is_empty = sandbox_run_queue_fifo_is_empty, .remove = sandbox_run_queue_fifo_remove_and_return, - .delete = sandbox_run_queue_fifo_remove }; + .delete = sandbox_run_queue_fifo_remove, + .get_next = sandbox_run_queue_fifo_get_next }; sandbox_run_queue_initialize(&config); } \ No newline at end of file diff --git a/runtime/src/sandbox_run_queue_ps.c b/runtime/src/sandbox_run_queue_ps.c index d9cd4c1..6e59be0 100644 --- a/runtime/src/sandbox_run_queue_ps.c +++ b/runtime/src/sandbox_run_queue_ps.c @@ -1,6 +1,7 @@ #include "sandbox_run_queue_ps.h" #include "sandbox_run_queue.h" #include "priority_queue.h" +#include "sandbox_request_scheduler.h" // Local State __thread static struct priority_queue sandbox_run_queue_ps; @@ -46,6 +47,61 @@ sandbox_run_queue_ps_delete(struct sandbox *sandbox) assert(rc != -2); } +/** + * This function determines the next sandbox to run. This is either the head of the runqueue or the + * + * Execute the sandbox at the head of the thread local runqueue + * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head + * @return the sandbox to execute or NULL if none are available + **/ +struct sandbox * +sandbox_run_queue_ps_get_next() +{ + // At any point, we may need to run the head of the request scheduler, the head of the local runqueue, or we + // might want to continue executing the current sandbox. If we want to keep executing the current sandbox, we + // should have a fast path to be able to resume without context switches. + + // If the run queue is empty, we've run the current sandbox to completion + + // We assume that the current sandbox is always on the runqueue when in a runnable state, we know it's the + // highest priority thing on the runqueue. + + // Case 1: Current runqueue is empty, so pull from global queue and add to runqueue + if (sandbox_run_queue_is_empty()) { + sandbox_request_t *sandbox_request = sandbox_request_scheduler_remove(); + if (sandbox_request == NULL) return NULL; + struct sandbox *sandbox = sandbox_allocate(sandbox_request); + assert(sandbox); + free(sandbox_request); + sandbox->state = RUNNABLE; + sandbox_run_queue_add(sandbox); + return sandbox; + } + + // Case 2: Current runqueue is not empty, so compare head of runqueue to head of global request queue and return + // highest priority + + uint64_t global_deadline = sandbox_request_scheduler_peek() - SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES; + // This should be refactored to peek at the top of the runqueue + struct sandbox *head_of_runqueue = sandbox_run_queue_remove(); + uint64_t local_deadline = head_of_runqueue->absolute_deadline; + sandbox_run_queue_add(head_of_runqueue); + + if (local_deadline <= global_deadline) { + return head_of_runqueue; + } else { + sandbox_request_t *sandbox_request = sandbox_request_scheduler_remove(); + struct sandbox * sandbox = sandbox_allocate(sandbox_request); + assert(sandbox); + free(sandbox_request); + sandbox->state = RUNNABLE; + sandbox_run_queue_add(sandbox); + debuglog("[%p: %s]\n", sandbox, sandbox->module->name); + return sandbox; + } +} + + uint64_t sandbox_get_priority(void *element) { @@ -63,7 +119,8 @@ sandbox_run_queue_ps_initialize() sandbox_run_queue_config_t config = { .add = sandbox_run_queue_ps_add, .is_empty = sandbox_run_queue_ps_is_empty, .remove = sandbox_run_queue_ps_remove, - .delete = sandbox_run_queue_ps_delete }; + .delete = sandbox_run_queue_ps_delete, + .get_next = sandbox_run_queue_ps_get_next }; sandbox_run_queue_initialize(&config); } diff --git a/runtime/src/software_interrupt.c b/runtime/src/software_interrupt.c index 2582aa5..439bc6d 100644 --- a/runtime/src/software_interrupt.c +++ b/runtime/src/software_interrupt.c @@ -13,12 +13,14 @@ #include #include #include +#include "sandbox_run_queue.h" /*************************************** * Process Globals ***************************************/ static const int software_interrupt_supported_signals[] = { SIGALRM, SIGUSR1 }; +uint64_t SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES; /*************************************** * Thread Globals @@ -116,29 +118,31 @@ static inline void software_interrupt_schedule_alarm(void *user_context_raw) { software_interrupt_disable(); // no nesting! - - struct sandbox *current_sandbox = current_sandbox_get(); + struct sandbox *current_sandbox = current_sandbox_get(); + bool should_enable_software_interrupt = true; // If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. if (current_sandbox != NULL) { - // find a next sandbox to run.. - struct sandbox *next_sandbox = worker_thread_get_next_sandbox(); + struct sandbox *next_sandbox = sandbox_run_queue_get_next(); if (next_sandbox != NULL && next_sandbox != current_sandbox) { ucontext_t *user_context = (ucontext_t *)user_context_raw; - // Save context to the sandbox we're switching from + // Save the context of the currently executing sandbox before switching from it arch_mcontext_save(¤t_sandbox->ctxt, &user_context->uc_mcontext); - // current_sandbox_set on it. restore through *user_context.. + // Update current_sandbox to the next sandbox current_sandbox_set(next_sandbox); - if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt)) goto skip; + + // And load the context of this new sandbox + // RC of 1 indicates that sandbox was last in a user-level context switch state, + // so do not enable software interrupts. + if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1) + should_enable_software_interrupt = false; } } - software_interrupt_enable(); -skip: - return; + if (should_enable_software_interrupt) software_interrupt_enable(); } /*************************************** diff --git a/runtime/src/worker_thread.c b/runtime/src/worker_thread.c index 917f77a..f15592b 100644 --- a/runtime/src/worker_thread.c +++ b/runtime/src/worker_thread.c @@ -106,7 +106,7 @@ worker_thread_block_current_sandbox(void) previous_sandbox->state = BLOCKED; // Switch to the next sandbox - struct sandbox *next_sandbox = worker_thread_get_next_sandbox(); + struct sandbox *next_sandbox = sandbox_run_queue_get_next(); debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", previous_sandbox, previous_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); software_interrupt_enable(); @@ -147,32 +147,6 @@ void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_s ; } -/** - * Pulls up to 1..n sandbox requests, allocates them as sandboxes, sets them as runnable and places them on the local - * runqueue, and then frees the sandbox requests The batch size pulled at once is set by SANDBOX_PULL_BATCH_SIZE - * @return the number of sandbox requests pulled - */ -static inline int -worker_thread_pull_and_process_sandbox_requests(void) -{ - int total_sandboxes_pulled = 0; - - while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) { - sandbox_request_t *sandbox_request; - if ((sandbox_request = sandbox_request_scheduler_remove()) == NULL) break; - // Actually allocate the sandbox for the requests that we've pulled - struct sandbox *sandbox = sandbox_allocate(sandbox_request); - assert(sandbox); - free(sandbox_request); - // Set the sandbox as runnable and place on the local runqueue - sandbox->state = RUNNABLE; - sandbox_run_queue_add(sandbox); - total_sandboxes_pulled++; - } - - return total_sandboxes_pulled; -} - /** * Run all outstanding events in the local thread's libuv event loop **/ @@ -188,28 +162,6 @@ worker_thread_execute_libuv_event_loop(void) worker_thread_is_in_callback = false; } -/** - * Execute the sandbox at the head of the thread local runqueue - * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head - * @return the sandbox to execute or NULL if none are available - **/ -struct sandbox * -worker_thread_get_next_sandbox() -{ - if (sandbox_run_queue_is_empty()) { - int sandboxes_pulled = worker_thread_pull_and_process_sandbox_requests(); - if (sandboxes_pulled == 0) return NULL; - } - - // Execute Round Robin Scheduling Logic - struct sandbox *next_sandbox = sandbox_run_queue_remove(); - assert(next_sandbox->state != RETURNED); - sandbox_run_queue_add(next_sandbox); - - debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name); - return next_sandbox; -} - /** * The entry function for sandbox worker threads * Initializes thread-local state, unmasks signals, sets up libuv loop and @@ -221,8 +173,8 @@ worker_thread_main(void *return_code) // Initialize Worker State arch_context_init(&worker_thread_base_context, 0, 0); - // sandbox_run_queue_fifo_initialize(); - sandbox_run_queue_ps_initialize(); + sandbox_run_queue_fifo_initialize(); + // sandbox_run_queue_ps_initialize(); sandbox_completion_queue_initialize(); software_interrupt_is_disabled = false; @@ -243,7 +195,7 @@ worker_thread_main(void *return_code) if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop(); software_interrupt_disable(); - next_sandbox = worker_thread_get_next_sandbox(); + next_sandbox = sandbox_run_queue_get_next(); software_interrupt_enable(); if (next_sandbox != NULL) { @@ -272,7 +224,7 @@ worker_thread_exit_current_sandbox(void) sandbox_run_queue_delete(previous_sandbox); previous_sandbox->state = RETURNED; - struct sandbox *next_sandbox = worker_thread_get_next_sandbox(); + struct sandbox *next_sandbox = sandbox_run_queue_get_next(); assert(next_sandbox != previous_sandbox); software_interrupt_enable(); // Because the stack is still in use, only unmap linear memory and defer free resources until "main