chore: Simplify worker scheduling

main
Sean McBride 5 years ago
parent db2372f02f
commit f5f9c168c6

@ -88,7 +88,7 @@ extern __thread arch_context_t *worker_thread_next_context;
extern void worker_thread_block_current_sandbox(void);
extern void worker_thread_exit_current_sandbox(void);
extern struct sandbox *worker_thread_get_next_sandbox(bool is_in_interrupt);
extern struct sandbox *worker_thread_get_next_sandbox(void);
extern void worker_thread_process_io(void);
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void);
extern void worker_thread_wakeup_sandbox(sandbox_t *sandbox);

@ -43,6 +43,7 @@ int runtime_epoll_file_descriptor;
void
runtime_initialize(void)
{
// Setup epoll
runtime_epoll_file_descriptor = epoll_create1(0);
assert(runtime_epoll_file_descriptor >= 0);
@ -82,6 +83,8 @@ listener_thread_main(void *dummy)
while (true) {
int request_count = epoll_wait(runtime_epoll_file_descriptor, epoll_events,
LISTENER_THREAD_MAX_EPOLL_EVENTS, -1);
// Capture Start Time to calculate absolute deadline
u64 start_time = __getcycles();
for (int i = 0; i < request_count; i++) {
if (epoll_events[i].events & EPOLLERR) {
@ -89,6 +92,7 @@ listener_thread_main(void *dummy)
assert(false);
}
// Accept Client Request
struct sockaddr_in client_address;
socklen_t client_length = sizeof(client_address);
struct module * module = (struct module *)epoll_events[i].data.ptr;
@ -101,16 +105,18 @@ listener_thread_main(void *dummy)
}
total_requests++;
// Allocate a Sandbox Request
sandbox_request_t *sandbox_request =
sandbox_request_allocate(module, module->name, socket_descriptor,
(const struct sockaddr *)&client_address, start_time);
assert(sandbox_request);
// Add to the Global Sandbox Request Scheduler
sandbox_request_scheduler_add(sandbox_request);
}
}
free(epoll_events);
return NULL;
}
@ -167,14 +173,24 @@ static inline void
worker_thread_switch_to_sandbox(struct sandbox *next_sandbox)
{
arch_context_t *next_register_context = next_sandbox == NULL ? NULL : &next_sandbox->ctxt;
software_interrupt_disable();
struct sandbox *current_sandbox = current_sandbox_get();
arch_context_t *current_register_context = current_sandbox == NULL ? NULL : &current_sandbox->ctxt;
// Get the old sandbox we're switching from
struct sandbox *previous_sandbox = current_sandbox_get();
arch_context_t *previous_register_context = previous_sandbox == NULL ? NULL : &previous_sandbox->ctxt;
// Set the current sandbox to the next
current_sandbox_set(next_sandbox);
// If the current sandbox we're switching from is in a RETURNED state, add to completion queue
if (current_sandbox && current_sandbox->state == RETURNED) sandbox_completion_queue_add(current_sandbox);
// and switch to the associated context. But what is the purpose of worker_thread_next_context?
worker_thread_next_context = next_register_context;
arch_context_switch(current_register_context, next_register_context);
arch_context_switch(previous_register_context, next_register_context);
// If the current sandbox we're switching from is in a RETURNED state, add to completion queue
if (previous_sandbox != NULL && previous_sandbox->state == RETURNED)
sandbox_completion_queue_add(previous_sandbox);
software_interrupt_enable();
}
@ -187,10 +203,10 @@ worker_thread_wakeup_sandbox(sandbox_t *sandbox)
{
software_interrupt_disable();
// debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
if (sandbox->state != BLOCKED) goto done;
if (sandbox->state == BLOCKED) {
sandbox->state = RUNNABLE;
sandbox_run_queue_append(sandbox);
done:
}
software_interrupt_enable();
}
@ -205,13 +221,14 @@ worker_thread_block_current_sandbox(void)
assert(worker_thread_is_in_callback == false);
software_interrupt_disable();
struct sandbox *current_sandbox = current_sandbox_get();
sandbox_run_queue_remove(current_sandbox);
current_sandbox->state = BLOCKED;
// Remove the sandbox we were just executing from the runqueue and mark as blocked
struct sandbox *previous_sandbox = current_sandbox_get();
sandbox_run_queue_remove(previous_sandbox);
previous_sandbox->state = BLOCKED;
struct sandbox *next_sandbox = worker_thread_get_next_sandbox(false);
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name,
// Switch to the next sandbox
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", previous_sandbox, previous_sandbox->module->name,
next_sandbox, next_sandbox ? next_sandbox->module->name : "");
software_interrupt_enable();
worker_thread_switch_to_sandbox(next_sandbox);
@ -295,47 +312,26 @@ worker_thread_execute_libuv_event_loop(void)
/**
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @param in_interrupt if this is getting called in the context of an interrupt
* @return the sandbox to execute or NULL if none are available
**/
struct sandbox *
worker_thread_get_next_sandbox(bool is_in_interrupt)
worker_thread_get_next_sandbox()
{
// If the thread local runqueue is empty and we're not running in the context of an interupt,
// pull a fresh batch of sandbox requests from the global queue
if (sandbox_run_queue_is_empty()) {
if (is_in_interrupt) return NULL;
if (worker_thread_pull_and_process_sandbox_requests() == 0) {
// debuglog("[null: null]\n");
return NULL;
}
int sandboxes_pulled = worker_thread_pull_and_process_sandbox_requests();
if (sandboxes_pulled == 0) return NULL;
}
// Execute Round Robin Scheduling Logic
// Grab the sandbox at the head of the thread local runqueue, add it to the end, and return it
struct sandbox *sandbox = sandbox_run_queue_get_head();
// We are assuming that any sandboxed in the RETURNED state should have been pulled from the local runqueue by
// now!
assert(sandbox->state != RETURNED);
ps_list_rem_d(sandbox);
sandbox_run_queue_append(sandbox);
debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
return sandbox;
}
struct sandbox *next_sandbox = sandbox_run_queue_get_head();
assert(next_sandbox->state != RETURNED);
sandbox_run_queue_remove(next_sandbox);
sandbox_run_queue_append(next_sandbox);
/**
* Tries to free a completed request, executes libuv callbacks
* @return sandbox or NULL
**/
static inline void
worker_thread_execute_runtime_maintenance(void)
{
assert(current_sandbox_get() == NULL);
sandbox_completion_queue_free(1);
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name);
return next_sandbox;
}
/**
* The entry function for sandbox worker threads
* Initializes thread-local state, unmasks signals, sets up libuv loop and
@ -344,8 +340,8 @@ worker_thread_execute_runtime_maintenance(void)
void *
worker_thread_main(void *return_code)
{
// Initialize Worker State
arch_context_init(&worker_thread_base_context, 0, 0);
sandbox_run_queue_initialize();
sandbox_completion_queue_initialize();
software_interrupt_is_disabled = false;
@ -357,19 +353,21 @@ worker_thread_main(void *return_code)
uv_loop_init(&worker_thread_uvio_handle);
worker_thread_is_in_callback = false;
// Begin Worker Execution Loop
struct sandbox *next_sandbox;
while (true) {
worker_thread_execute_runtime_maintenance();
software_interrupt_disable();
struct sandbox *sandbox = worker_thread_get_next_sandbox(false);
software_interrupt_enable();
assert(sandbox == NULL || sandbox->state == RUNNABLE);
while (sandbox) {
worker_thread_switch_to_sandbox(sandbox);
worker_thread_execute_runtime_maintenance();
assert(current_sandbox_get() == NULL);
// If "in a callback", the libuv event loop is triggering this, so we don't need to start it
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
software_interrupt_disable();
sandbox = worker_thread_get_next_sandbox(false);
next_sandbox = worker_thread_get_next_sandbox();
software_interrupt_enable();
assert(sandbox == NULL || sandbox->state == RUNNABLE);
if (next_sandbox != NULL) {
worker_thread_switch_to_sandbox(next_sandbox);
sandbox_completion_queue_free(1);
}
}
@ -386,17 +384,18 @@ worker_thread_main(void *return_code)
void
worker_thread_exit_current_sandbox(void)
{
struct sandbox *current_sandbox = current_sandbox_get();
assert(current_sandbox);
// Remove the sandbox that exited from the runqueue and set state to RETURNED
struct sandbox *previous_sandbox = current_sandbox_get();
assert(previous_sandbox);
software_interrupt_disable();
sandbox_run_queue_remove(current_sandbox);
current_sandbox->state = RETURNED;
sandbox_run_queue_remove(previous_sandbox);
previous_sandbox->state = RETURNED;
struct sandbox *next_sandbox = worker_thread_get_next_sandbox(true);
assert(next_sandbox != current_sandbox);
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
assert(next_sandbox != previous_sandbox);
software_interrupt_enable();
// free resources from "main function execution", as stack still in use.
// unmap linear memory only!
munmap(current_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE);
// Because the stack is still in use, only unmap linear memory and defer free resources until "main function
// execution"
munmap(previous_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE);
worker_thread_switch_to_sandbox(next_sandbox);
}

@ -55,24 +55,22 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
#ifdef PREEMPT_DISABLE
assert(0);
#else
struct sandbox *current_sandbox = current_sandbox_get();
ucontext_t * user_context = (ucontext_t *)user_context_raw;
struct sandbox *current_sandbox = current_sandbox_get();
switch (signal_type) {
case SIGALRM: {
// if interrupts are disabled.. increment a per_thread counter and return
// SIGALRM is the preemption signal that occurs every quantum of execution
if (signal_info->si_code == SI_KERNEL) {
int rt = 0;
// deliver signal to all other runtime threads..
// deliver signal to all other worker threads..
for (int i = 0; i < WORKER_THREAD_CORE_COUNT; i++) {
if (pthread_self() == runtime_worker_threads[i]) {
rt = 1;
continue;
}
if (pthread_self() == runtime_worker_threads[i]) continue;
pthread_kill(runtime_worker_threads[i], SIGALRM);
}
assert(rt == 1);
} else {
// What is this?
assert(signal_info->si_code == SI_TKILL);
}
// debuglog("alrm:%d\n", software_interrupt_SIGALRM_count);
@ -82,7 +80,7 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
if (current_sandbox && current_sandbox->state == RETURNED) return;
if (worker_thread_next_context) return;
if (!software_interrupt_is_enabled()) return;
software_interrupt_schedule_alarm(user_context_raw);
software_interrupt_schedule_alarm(user_context);
break;
}
@ -120,28 +118,24 @@ software_interrupt_schedule_alarm(void *user_context_raw)
software_interrupt_disable(); // no nesting!
struct sandbox *current_sandbox = current_sandbox_get();
ucontext_t * user_context = (ucontext_t *)user_context_raw;
// no sandboxes running..so nothing to preempt..let the "main" scheduler run its course.
if (current_sandbox == NULL) goto done;
// If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course.
if (current_sandbox != NULL) {
// find a next sandbox to run..
struct sandbox *next_sandbox = worker_thread_get_next_sandbox(true);
if (next_sandbox == NULL) goto done;
if (next_sandbox == current_sandbox) goto done; // only this sandbox to schedule.. return to it!
// save the current sandbox, state from user_context!
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
if (next_sandbox != NULL && next_sandbox != current_sandbox) {
ucontext_t *user_context = (ucontext_t *)user_context_raw;
// Save context to the sandbox we're switching from
arch_mcontext_save(&current_sandbox->ctxt, &user_context->uc_mcontext);
// current_sandbox_set on it. restore through *user_context..
current_sandbox_set(next_sandbox);
if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt)) goto skip;
// reset if SIGALRM happens before SIGUSR1 and if don't preempt..OR
// perhaps switch here for SIGUSR1 and see if we can clear that signal
// so it doesn't get called on SIGALRM return..
// worker_thread_next_context = NULL;
}
}
done:
software_interrupt_enable();
skip:
return;

Loading…
Cancel
Save