#include #include #include #include #include #include #include "current_sandbox.h" #include "global_request_scheduler.h" #include "local_completion_queue.h" #include "local_runqueue.h" #include "local_runqueue_list.h" #include "local_runqueue_minheap.h" #include "panic.h" #include "runtime.h" #include "types.h" #include "worker_thread.h" /*************************** * Worker Thread State * **************************/ /* context of the runtime thread before running sandboxes or to resume its "main". */ __thread struct arch_context worker_thread_base_context; /* libuv i/o loop handle per sandboxing thread! */ __thread uv_loop_t worker_thread_uvio_handle; /* Flag to signify if the thread is currently running callbacks in the libuv event loop */ static __thread bool worker_thread_is_in_libuv_event_loop = false; /*********************** * Worker Thread Logic * **********************/ /** * Conditionally triggers appropriate state changes for exiting sandboxes * @param exiting_sandbox - The sandbox that ran to completion */ static inline void worker_thread_transition_exiting_sandbox(struct sandbox *exiting_sandbox) { assert(exiting_sandbox != NULL); switch (exiting_sandbox->state) { case SANDBOX_RETURNED: /* * We draw a distinction between RETURNED and COMPLETED because a sandbox cannot add itself to the * completion queue */ sandbox_set_as_complete(exiting_sandbox); break; case SANDBOX_ERROR: /* Terminal State, so just break */ break; default: panic("Cooperatively switching from a sandbox in a non-terminal %s state\n", sandbox_state_stringify(exiting_sandbox->state)); } } /** * @brief Switches to the next sandbox, placing the current sandbox on the completion queue if in SANDBOX_RETURNED state * @param next_sandbox The Sandbox Context to switch to * * TODO: Confirm that this can gracefully resume sandboxes in a PREEMPTED state */ static inline void worker_thread_switch_to_sandbox(struct sandbox *next_sandbox) { /* Assumption: The caller disables interrupts */ assert(software_interrupt_is_disabled); assert(next_sandbox != NULL); struct arch_context *next_context = &next_sandbox->ctxt; /* Get the old sandbox we're switching from */ struct sandbox *current_sandbox = current_sandbox_get(); if (current_sandbox == NULL) { /* Switching from "Base Context" */ sandbox_set_as_running(next_sandbox); debuglog("Base Context (%s) > Sandbox %lu (%s)\n", arch_context_variant_print(worker_thread_base_context.variant), next_sandbox->request_arrival_timestamp, arch_context_variant_print(next_context->variant)); arch_context_switch(NULL, next_context); } else { /* Set the current sandbox to the next */ assert(next_sandbox != current_sandbox); worker_thread_transition_exiting_sandbox(current_sandbox); sandbox_set_as_running(next_sandbox); struct arch_context *current_context = ¤t_sandbox->ctxt; debuglog("Sandbox %lu > Sandbox %lu\n", current_sandbox->request_arrival_timestamp, next_sandbox->request_arrival_timestamp); /* Switch to the associated context. */ arch_context_switch(current_context, next_context); } software_interrupt_enable(); } /** * @brief Switches to the base context, placing the current sandbox on the completion queue if in RETURNED state */ static inline void worker_thread_switch_to_base_context() { assert(!software_interrupt_is_enabled()); struct sandbox *current_sandbox = current_sandbox_get(); /* Assumption: Base Context should never switch to Base Context */ assert(current_sandbox != NULL); assert(¤t_sandbox->ctxt != &worker_thread_base_context); current_sandbox_set(NULL); debuglog("Sandbox %lu (%s) > Base Context (%s)\n", current_sandbox->request_arrival_timestamp, arch_context_variant_print(current_sandbox->ctxt.variant), arch_context_variant_print(worker_thread_base_context.variant)); arch_context_switch(¤t_sandbox->ctxt, &worker_thread_base_context); software_interrupt_enable(); } /** * Mark a blocked sandbox as runnable and add it to the runqueue * @param sandbox the sandbox to check and update if blocked */ void worker_thread_wakeup_sandbox(struct sandbox *sandbox) { assert(sandbox != NULL); assert(sandbox->state == SANDBOX_BLOCKED); software_interrupt_disable(); sandbox_set_as_runnable(sandbox); software_interrupt_enable(); } /** * Mark the currently executing sandbox as blocked, remove it from the local runqueue, * and pull the sandbox at the head of the runqueue */ void worker_thread_block_current_sandbox(void) { assert(worker_thread_is_in_libuv_event_loop == false); software_interrupt_disable(); /* Remove the sandbox we were just executing from the runqueue and mark as blocked */ struct sandbox *current_sandbox = current_sandbox_get(); local_runqueue_delete(current_sandbox); current_sandbox->state = SANDBOX_BLOCKED; /* Switch to the next sandbox */ struct sandbox *next_sandbox = local_runqueue_get_next(); debuglog("[%p: %p, %p: %p]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); /* If able to get one, switch to it. Otherwise, return to base context */ if (next_sandbox == NULL) { worker_thread_switch_to_base_context(); } else { debuglog("[%p: %p, %p: %p]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); worker_thread_switch_to_sandbox(next_sandbox); } } /** * Execute I/O */ void worker_thread_process_io(void) { #ifdef USE_HTTP_UVIO #ifdef USE_HTTP_SYNC /* realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, * not great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, * return else do async block! */ uv_run(worker_thread_get_libuv_handle(), UV_RUN_DEFAULT); #else /* USE_HTTP_SYNC */ worker_thread_block_current_sandbox(); #endif /* USE_HTTP_UVIO */ #else assert(false); /* it should not be called if not using uvio for http */ #endif } /** * Run all outstanding events in the local thread's libuv event loop */ void worker_thread_execute_libuv_event_loop(void) { worker_thread_is_in_libuv_event_loop = true; int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0; while (n > 0) { n--; uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT); } worker_thread_is_in_libuv_event_loop = false; } /** * The entry function for sandbox worker threads * Initializes thread-local state, unmasks signals, sets up libuv loop and * @param return_code - argument provided by pthread API. We set to -1 on error */ void * worker_thread_main(void *return_code) { /* Initialize Base Context */ arch_context_init(&worker_thread_base_context, 0, 0); /* Initialize Runqueue Variant */ // local_runqueue_list_initialize(); local_runqueue_minheap_initialize(); /* Initialize Completion Queue */ local_completion_queue_initialize(); /* Initialize Flags */ software_interrupt_is_disabled = false; worker_thread_is_in_libuv_event_loop = false; /* Unmask signals */ #ifndef PREEMPT_DISABLE software_interrupt_unmask_signal(SIGALRM); software_interrupt_unmask_signal(SIGUSR1); #endif /* Initialize libuv event loop handle */ uv_loop_init(&worker_thread_uvio_handle); /* Begin Worker Execution Loop */ struct sandbox *next_sandbox; while (true) { /* Assumption: current_sandbox should be unset at start of loop */ assert(current_sandbox_get() == NULL); /* Execute libuv event loop */ if (!worker_thread_is_in_libuv_event_loop) worker_thread_execute_libuv_event_loop(); /* Switch to a sandbox if one is ready to run */ software_interrupt_disable(); next_sandbox = local_runqueue_get_next(); if (next_sandbox != NULL) { worker_thread_switch_to_sandbox(next_sandbox); } else { software_interrupt_enable(); }; /* Clear the completion queue */ local_completion_queue_free(); } panic("Worker Thread unexpectedly completed run loop."); } /** * Called when the function in the sandbox exits * Removes the standbox from the thread-local runqueue, sets its state to SANDBOX_RETURNED, * releases the linear memory, and then returns to the base context * TODO: Consider moving this to a future current_sandbox file. This has thus far proven difficult to move */ __attribute__((noreturn)) void worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox) { assert(exiting_sandbox); debuglog("Exiting %lu\n", exiting_sandbox->request_arrival_timestamp); /* Because the stack is still in use, only unmap linear memory and defer free resources until "main function execution" */ sandbox_free_linear_memory(exiting_sandbox); /* TODO: I do not understand when software interrupts must be disabled? */ software_interrupt_disable(); local_runqueue_delete(exiting_sandbox); exiting_sandbox->state = SANDBOX_RETURNED; sandbox_print_perf(exiting_sandbox); local_completion_queue_add(exiting_sandbox); /* This should force return to main event loop */ worker_thread_switch_to_base_context(); assert(0); }