feat: rework of scheduler logic

main
Sean McBride 4 years ago
parent 91c429cd8f
commit 0f0d0fcb18

@ -76,14 +76,16 @@
"local_runqueue.h": "c", "local_runqueue.h": "c",
"software_interrupt.h": "c", "software_interrupt.h": "c",
"sandbox_set_as_runnable.h": "c", "sandbox_set_as_runnable.h": "c",
"current_sandbox_yield.h": "c",
"sandbox_set_as_complete.h": "c", "sandbox_set_as_complete.h": "c",
"deque.h": "c", "deque.h": "c",
"sandbox_request.h": "c", "sandbox_request.h": "c",
"sandbox_send_response.h": "c", "sandbox_send_response.h": "c",
"sandbox_setup_arguments.h": "c", "sandbox_setup_arguments.h": "c",
"worker_thread.h": "c", "worker_thread.h": "c",
"sandbox_set_as_returned.h": "c" "sandbox_set_as_error.h": "c",
"likely.h": "c",
"debuglog.h": "c",
"worker_thread_execute_epoll_loop.h": "c"
}, },
"files.exclude": { "files.exclude": {
"**/.git": true, "**/.git": true,
@ -138,4 +140,4 @@
"*.bc": true, "*.bc": true,
"*.wasm": true, "*.wasm": true,
} }
} }

@ -56,9 +56,6 @@ arch_mcontext_restore(mcontext_t *active_context, struct arch_context *sandbox_c
/* Restore mcontext */ /* Restore mcontext */
memcpy(active_context, &sandbox_context->mctx, sizeof(mcontext_t)); memcpy(active_context, &sandbox_context->mctx, sizeof(mcontext_t));
/* Reenable software interrupts if we restored a preemptable sandbox */
if (sandbox_context->preemptable) software_interrupt_enable();
} }

@ -52,7 +52,6 @@ arch_context_init(struct arch_context *actx, reg_t ip, reg_t sp)
/** /**
* Load a new sandbox that preempted an existing sandbox, restoring only the * Load a new sandbox that preempted an existing sandbox, restoring only the
* instruction pointer and stack pointer registers. * instruction pointer and stack pointer registers.
* I am unclear about setting the BP. Issue #131
* @param active_context - the context of the current worker thread * @param active_context - the context of the current worker thread
* @param sandbox_context - the context that we want to restore * @param sandbox_context - the context that we want to restore
*/ */

@ -1,42 +0,0 @@
#pragma once
#include <assert.h>
#include <stddef.h>
#include "current_sandbox.h"
#include "current_sandbox_yield.h"
#include "generic_thread.h"
#include "local_runqueue.h"
#include "sandbox_set_as_blocked.h"
#include "software_interrupt.h"
/**
* Mark the currently executing sandbox as blocked, remove it from the local runqueue,
* and switch to base context
*/
static inline void
current_sandbox_block(void)
{
/* Remove the sandbox we were just executing from the runqueue and mark as blocked */
struct sandbox *current_sandbox = current_sandbox_get();
/* We might either have blocked in start reading the request or while executing within the WebAssembly
* entrypoint. The preemptable flag on the context is used to differentiate. In either case, we should
* have disabled interrupts.
*/
if (current_sandbox->ctxt.preemptable) software_interrupt_disable();
assert(!software_interrupt_is_enabled());
assert(current_sandbox->state == SANDBOX_RUNNING);
sandbox_set_as_blocked(current_sandbox, SANDBOX_RUNNING);
generic_thread_dump_lock_overhead();
/* The worker thread seems to "spin" on a blocked sandbox, so try to execute another sandbox for one quantum
* after blocking to give time for the action to resolve */
struct sandbox *next_sandbox = local_runqueue_get_next();
if (next_sandbox != NULL) {
sandbox_switch_to(next_sandbox);
} else {
current_sandbox_yield();
};
}

@ -1,45 +0,0 @@
#pragma once
#include <assert.h>
#include <stddef.h>
#include <stdint.h>
#include "arch/context.h"
#include "current_sandbox.h"
#include "sandbox_types.h"
#include "sandbox_exit.h"
#include "software_interrupt.h"
/**
* @brief Switches to the base context, placing the current sandbox on the completion queue if in RETURNED state
*/
static inline void
current_sandbox_yield()
{
assert(!software_interrupt_is_enabled());
struct sandbox *current_sandbox = current_sandbox_get();
#ifndef NDEBUG
if (current_sandbox != NULL) {
assert(current_sandbox->state < SANDBOX_STATE_COUNT);
assert(current_sandbox->stack_size == current_sandbox->module->stack_size);
}
#endif
/* Assumption: Base Context should never switch to Base Context */
assert(current_sandbox != NULL);
struct arch_context *current_context = &current_sandbox->ctxt;
assert(current_context != &worker_thread_base_context);
#ifdef LOG_CONTEXT_SWITCHES
debuglog("Sandbox %lu (@%p) (%s) > Base Context (@%p) (%s)\n", current_sandbox->id, current_context,
arch_context_variant_print(current_sandbox->ctxt.variant), &worker_thread_base_context,
arch_context_variant_print(worker_thread_base_context.variant));
#endif
sandbox_exit(current_sandbox);
current_sandbox_set(NULL);
assert(worker_thread_base_context.variant == ARCH_CONTEXT_VARIANT_FAST);
runtime_worker_threads_deadline[worker_thread_idx] = UINT64_MAX;
arch_context_switch(current_context, &worker_thread_base_context);
}

@ -9,14 +9,12 @@ typedef void (*local_runqueue_add_fn_t)(struct sandbox *);
typedef bool (*local_runqueue_is_empty_fn_t)(void); typedef bool (*local_runqueue_is_empty_fn_t)(void);
typedef void (*local_runqueue_delete_fn_t)(struct sandbox *sandbox); typedef void (*local_runqueue_delete_fn_t)(struct sandbox *sandbox);
typedef struct sandbox *(*local_runqueue_get_next_fn_t)(); typedef struct sandbox *(*local_runqueue_get_next_fn_t)();
typedef void (*local_runqueue_preempt_fn_t)(ucontext_t *);
struct local_runqueue_config { struct local_runqueue_config {
local_runqueue_add_fn_t add_fn; local_runqueue_add_fn_t add_fn;
local_runqueue_is_empty_fn_t is_empty_fn; local_runqueue_is_empty_fn_t is_empty_fn;
local_runqueue_delete_fn_t delete_fn; local_runqueue_delete_fn_t delete_fn;
local_runqueue_get_next_fn_t get_next_fn; local_runqueue_get_next_fn_t get_next_fn;
local_runqueue_preempt_fn_t preempt_fn;
}; };
void local_runqueue_add(struct sandbox *); void local_runqueue_add(struct sandbox *);
@ -24,4 +22,3 @@ void local_runqueue_delete(struct sandbox *);
bool local_runqueue_is_empty(); bool local_runqueue_is_empty();
struct sandbox *local_runqueue_get_next(); struct sandbox *local_runqueue_get_next();
void local_runqueue_initialize(struct local_runqueue_config *config); void local_runqueue_initialize(struct local_runqueue_config *config);
void local_runqueue_preempt(ucontext_t *);

@ -8,13 +8,13 @@
#include <unistd.h> #include <unistd.h>
#include "current_sandbox.h" #include "current_sandbox.h"
#include "current_sandbox_block.h"
#include "debuglog.h" #include "debuglog.h"
#include "http_parser.h" #include "http_parser.h"
#include "http_request.h" #include "http_request.h"
#include "http_parser_settings.h" #include "http_parser_settings.h"
#include "likely.h" #include "likely.h"
#include "sandbox_types.h" #include "sandbox_types.h"
#include "scheduler.h"
/** /**
* Receive and Parse the Request for the current sandbox * Receive and Parse the Request for the current sandbox
@ -44,7 +44,7 @@ sandbox_receive_request(struct sandbox *sandbox)
if (recved < 0) { if (recved < 0) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
current_sandbox_block(); scheduler_block();
continue; continue;
} else { } else {
/* All other errors */ /* All other errors */

@ -9,11 +9,11 @@
#include <unistd.h> #include <unistd.h>
#include "current_sandbox.h" #include "current_sandbox.h"
#include "current_sandbox_block.h"
#include "http.h" #include "http.h"
#include "http_total.h" #include "http_total.h"
#include "likely.h" #include "likely.h"
#include "sandbox_types.h" #include "sandbox_types.h"
#include "scheduler.h"
#include "panic.h" #include "panic.h"
/** /**
@ -105,7 +105,7 @@ sandbox_send_response(struct sandbox *sandbox)
response_cursor - sent); response_cursor - sent);
if (rc < 0) { if (rc < 0) {
if (errno == EAGAIN) if (errno == EAGAIN)
current_sandbox_block(); scheduler_block();
else { else {
perror("write"); perror("write");
return -1; return -1;

@ -0,0 +1,198 @@
#pragma once
#include <assert.h>
#include <errno.h>
#include <stdint.h>
#include "client_socket.h"
#include "global_request_scheduler.h"
#include "local_runqueue.h"
#include "sandbox_request.h"
#include "sandbox_exit.h"
#include "sandbox_functions.h"
#include "sandbox_types.h"
#include "sandbox_set_as_blocked.h"
#include "sandbox_set_as_preempted.h"
#include "sandbox_set_as_runnable.h"
#include "sandbox_set_as_running.h"
#include "worker_thread_execute_epoll_loop.h"
static inline struct sandbox *
scheduler_get_next()
{
assert(!software_interrupt_is_enabled());
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox *local = local_runqueue_get_next();
uint64_t local_deadline = local == NULL ? UINT64_MAX : local->absolute_deadline;
uint64_t global_deadline = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_deadline < local_deadline) {
struct sandbox_request *request = NULL;
int return_code = global_request_scheduler_remove_if_earlier(&request, local_deadline);
if (return_code == 0) {
assert(request != NULL);
assert(request->absolute_deadline < local_deadline);
struct sandbox *global = sandbox_allocate(request);
if (!global) {
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
debuglog("scheduler failed to allocate sandbox\n");
} else {
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
return local_runqueue_get_next();
}
/**
* Called by the SIGALRM handler after a quantum
* Assumes the caller validates that there is something to preempt
* @param user_context - The context of our user-level Worker thread
*/
static inline void
scheduler_preempt(ucontext_t *user_context)
{
assert(user_context != NULL);
assert(!software_interrupt_is_enabled());
/* Process epoll to make sure that all runnable jobs are considered for execution */
worker_thread_execute_epoll_loop();
struct sandbox *current = current_sandbox_get();
assert(current != NULL);
assert(current->state == SANDBOX_RUNNING);
struct sandbox *next = scheduler_get_next();
assert(next != NULL);
/* If current equals return, we are already running earliest deadline, so resume execution */
if (current == next) return;
/* Save the context of the currently executing sandbox before switching from it */
sandbox_set_as_preempted(current, SANDBOX_RUNNING);
arch_mcontext_save(&current->ctxt, &user_context->uc_mcontext);
/* Update current_sandbox to the next sandbox */
assert(next->state == SANDBOX_RUNNABLE);
sandbox_set_as_running(next, SANDBOX_RUNNABLE);
/* Update the current deadline of the worker thread */
runtime_worker_threads_deadline[worker_thread_idx] = next->absolute_deadline;
/* Restore the context of this sandbox */
arch_context_restore_new(&user_context->uc_mcontext, &next->ctxt);
}
/**
* @brief Switches to the next sandbox, placing the current sandbox on the completion queue if in SANDBOX_RETURNED state
* @param next_sandbox The Sandbox Context to switch to
*/
static inline void
scheduler_switch_to(struct sandbox *next_sandbox)
{
/* Assumption: The caller disables interrupts */
assert(!software_interrupt_is_enabled());
assert(next_sandbox != NULL);
struct arch_context *next_context = &next_sandbox->ctxt;
/* Get the old sandbox we're switching from.
* This is null if switching from base context
*/
struct sandbox * current_sandbox = current_sandbox_get();
struct arch_context *current_context = NULL;
if (current_sandbox != NULL) current_context = &current_sandbox->ctxt;
assert(next_sandbox != current_sandbox);
/* Update the worker's absolute deadline */
runtime_worker_threads_deadline[worker_thread_idx] = next_sandbox->absolute_deadline;
if (current_sandbox == NULL) {
/* Switching from "Base Context" */
#ifdef LOG_CONTEXT_SWITCHES
debuglog("Base Context (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", &worker_thread_base_context,
arch_context_variant_print(worker_thread_base_context.variant), next_sandbox->id, next_context,
arch_context_variant_print(next_context->variant));
#endif
} else {
#ifdef LOG_CONTEXT_SWITCHES
debuglog("Sandbox %lu (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", current_sandbox->id,
&current_sandbox->ctxt, arch_context_variant_print(current_sandbox->ctxt.variant),
next_sandbox->id, &next_sandbox->ctxt, arch_context_variant_print(next_context->variant));
#endif
sandbox_exit(current_sandbox);
}
sandbox_set_as_running(next_sandbox, next_sandbox->state);
arch_context_switch(current_context, next_context);
}
/**
* @brief Switches to the base context, placing the current sandbox on the completion queue if in RETURNED state
*/
static inline void
scheduler_yield()
{
assert(!software_interrupt_is_enabled());
struct sandbox *current_sandbox = current_sandbox_get();
#ifndef NDEBUG
if (current_sandbox != NULL) {
assert(current_sandbox->state < SANDBOX_STATE_COUNT);
assert(current_sandbox->stack_size == current_sandbox->module->stack_size);
}
#endif
/* Assumption: Base Context should never switch to Base Context */
assert(current_sandbox != NULL);
struct arch_context *current_context = &current_sandbox->ctxt;
assert(current_context != &worker_thread_base_context);
#ifdef LOG_CONTEXT_SWITCHES
debuglog("Sandbox %lu (@%p) (%s) > Base Context (@%p) (%s)\n", current_sandbox->id, current_context,
arch_context_variant_print(current_sandbox->ctxt.variant), &worker_thread_base_context,
arch_context_variant_print(worker_thread_base_context.variant));
#endif
sandbox_exit(current_sandbox);
current_sandbox_set(NULL);
assert(worker_thread_base_context.variant == ARCH_CONTEXT_VARIANT_FAST);
runtime_worker_threads_deadline[worker_thread_idx] = UINT64_MAX;
arch_context_switch(current_context, &worker_thread_base_context);
}
/**
* Mark the currently executing sandbox as blocked, remove it from the local runqueue,
* and switch to base context
*/
static inline void
scheduler_block(void)
{
/* Remove the sandbox we were just executing from the runqueue and mark as blocked */
struct sandbox *current_sandbox = current_sandbox_get();
/* We might either have blocked in start reading the request or while executing within the WebAssembly
* entrypoint. The preemptable flag on the context is used to differentiate. In either case, we should
* have disabled interrupts.
*/
if (current_sandbox->ctxt.preemptable) software_interrupt_disable();
assert(!software_interrupt_is_enabled());
assert(current_sandbox->state == SANDBOX_RUNNING);
sandbox_set_as_blocked(current_sandbox, SANDBOX_RUNNING);
generic_thread_dump_lock_overhead();
scheduler_yield();
}

@ -0,0 +1,82 @@
#pragma once
#include <assert.h>
#include <errno.h>
#include "client_socket.h"
#include "panic.h"
#include "runtime.h"
#include "sandbox_functions.h"
#include "sandbox_set_as_error.h"
#include "sandbox_set_as_runnable.h"
#include "sandbox_state.h"
#include "sandbox_types.h"
#include "software_interrupt.h"
#include "worker_thread.h"
/**
* Run all outstanding events in the local thread's epoll loop
*/
static inline void
worker_thread_execute_epoll_loop(void)
{
assert(!software_interrupt_is_enabled());
while (true) {
struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS];
int descriptor_count = epoll_wait(worker_thread_epoll_file_descriptor, epoll_events,
RUNTIME_MAX_EPOLL_EVENTS, 0);
if (descriptor_count < 0) {
if (errno == EINTR) continue;
panic_err();
}
if (descriptor_count == 0) break;
for (int i = 0; i < descriptor_count; i++) {
if (epoll_events[i].events & (EPOLLIN | EPOLLOUT)) {
/* Re-add to runqueue if blocked */
struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr;
assert(sandbox);
if (sandbox->state == SANDBOX_BLOCKED) {
sandbox_set_as_runnable(sandbox, SANDBOX_BLOCKED);
}
} else if (epoll_events[i].events & (EPOLLERR | EPOLLHUP)) {
/* Mystery: This seems to never fire. Why? Issue #130 */
/* Close socket and set as error on socket error or unexpected client hangup */
struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr;
int error = 0;
socklen_t errlen = sizeof(error);
getsockopt(epoll_events[i].data.fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error > 0) {
debuglog("Socket error: %s", strerror(error));
} else if (epoll_events[i].events & EPOLLHUP) {
debuglog("Client Hungup");
} else {
debuglog("Unknown Socket error");
}
switch (sandbox->state) {
case SANDBOX_SET_AS_RETURNED:
case SANDBOX_RETURNED:
case SANDBOX_SET_AS_COMPLETE:
case SANDBOX_COMPLETE:
case SANDBOX_SET_AS_ERROR:
case SANDBOX_ERROR:
panic("Expected to have closed socket");
default:
client_socket_send(sandbox->client_socket_descriptor, 503);
sandbox_close_http(sandbox);
sandbox_set_as_error(sandbox, sandbox->state);
}
} else {
panic("Mystery epoll event!\n");
};
}
}
}

@ -1,11 +1,11 @@
#include "current_sandbox.h" #include "current_sandbox.h"
#include "current_sandbox_yield.h"
#include "sandbox_functions.h" #include "sandbox_functions.h"
#include "sandbox_receive_request.h" #include "sandbox_receive_request.h"
#include "sandbox_send_response.h" #include "sandbox_send_response.h"
#include "sandbox_set_as_error.h" #include "sandbox_set_as_error.h"
#include "sandbox_set_as_returned.h" #include "sandbox_set_as_returned.h"
#include "sandbox_setup_arguments.h" #include "sandbox_setup_arguments.h"
#include "scheduler.h"
// /* current sandbox that is active.. */ // /* current sandbox that is active.. */
__thread struct sandbox *worker_thread_current_sandbox = NULL; __thread struct sandbox *worker_thread_current_sandbox = NULL;
@ -95,7 +95,7 @@ current_sandbox_start(void)
done: done:
/* Cleanup connection and exit sandbox */ /* Cleanup connection and exit sandbox */
generic_thread_dump_lock_overhead(); generic_thread_dump_lock_overhead();
current_sandbox_yield(); scheduler_yield();
/* This assert prevents a segfault discussed in /* This assert prevents a segfault discussed in
* https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66 * https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66

@ -11,7 +11,7 @@
#include <unistd.h> #include <unistd.h>
#include "current_sandbox.h" #include "current_sandbox.h"
#include "current_sandbox_block.h" #include "scheduler.h"
#include "sandbox_functions.h" #include "sandbox_functions.h"
#include "worker_thread.h" #include "worker_thread.h"
@ -114,7 +114,7 @@ wasm_read(int32_t filedes, int32_t buf_offset, int32_t nbyte)
int32_t length_read = (int32_t)read(filedes, buf, nbyte); int32_t length_read = (int32_t)read(filedes, buf, nbyte);
if (length_read < 0) { if (length_read < 0) {
if (errno == EAGAIN) if (errno == EAGAIN)
current_sandbox_block(); scheduler_block();
else { else {
/* All other errors */ /* All other errors */
debuglog("Error reading socket %d - %s\n", filedes, strerror(errno)); debuglog("Error reading socket %d - %s\n", filedes, strerror(errno));
@ -157,7 +157,7 @@ wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size)
int32_t length_written = (int32_t)write(f, buf, buf_size); int32_t length_written = (int32_t)write(f, buf, buf_size);
if (length_written < 0) { if (length_written < 0) {
if (errno == EAGAIN) if (errno == EAGAIN)
current_sandbox_block(); scheduler_block();
else { else {
/* All other errors */ /* All other errors */
debuglog("Error reading socket %d - %s\n", fd, strerror(errno)); debuglog("Error reading socket %d - %s\n", fd, strerror(errno));

@ -67,14 +67,3 @@ local_runqueue_get_next()
assert(local_runqueue.get_next_fn != NULL); assert(local_runqueue.get_next_fn != NULL);
return local_runqueue.get_next_fn(); return local_runqueue.get_next_fn();
}; };
/**
* Preempt the current sandbox according to the scheduler variant
* @param context
*/
void
local_runqueue_preempt(ucontext_t *context)
{
assert(local_runqueue.preempt_fn != NULL);
return local_runqueue.preempt_fn(context);
};

@ -88,17 +88,6 @@ local_runqueue_list_append(struct sandbox *sandbox_to_append)
ps_list_head_append_d(&local_runqueue_list, sandbox_to_append); ps_list_head_append_d(&local_runqueue_list, sandbox_to_append);
} }
/**
* Conditionally checks to see if current sandbox should be preempted.
* FIFO doesn't preempt, so just return.
*/
void
local_runqueue_list_preempt(ucontext_t *user_context)
{
return;
}
void void
local_runqueue_list_initialize() local_runqueue_list_initialize()
{ {
@ -108,7 +97,6 @@ local_runqueue_list_initialize()
struct local_runqueue_config config = { .add_fn = local_runqueue_list_append, struct local_runqueue_config config = { .add_fn = local_runqueue_list_append,
.is_empty_fn = local_runqueue_list_is_empty, .is_empty_fn = local_runqueue_list_is_empty,
.delete_fn = local_runqueue_list_remove, .delete_fn = local_runqueue_list_remove,
.get_next_fn = local_runqueue_list_get_next, .get_next_fn = local_runqueue_list_get_next };
.preempt_fn = local_runqueue_list_preempt };
local_runqueue_initialize(&config); local_runqueue_initialize(&config);
}; };

@ -70,121 +70,13 @@ local_runqueue_minheap_get_next()
{ {
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
struct sandbox * sandbox = NULL; /* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox_request *sandbox_request = NULL; struct sandbox *next = NULL;
int sandbox_rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&sandbox); int rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&next);
while (sandbox_rc == -ENOENT && global_request_scheduler_peek() < ULONG_MAX && sandbox == NULL) { if (rc == -ENOENT) return NULL;
/* local runqueue empty, try to pull a sandbox request */
if (global_request_scheduler_remove(&sandbox_request) < 0) {
/* Assumption: Sandbox request should not be set in case of an error */
assert(sandbox_request == NULL);
goto done;
}
/* Try to allocate a sandbox. Try again on failure */ return next;
sandbox = sandbox_allocate(sandbox_request);
if (!sandbox) {
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor, &sandbox->client_address);
free(sandbox_request);
continue;
};
assert(sandbox->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(sandbox, SANDBOX_INITIALIZED);
}
done:
return sandbox;
err:
sandbox = NULL;
goto done;
}
/**
* Called by the SIGALRM handler after a quantum
* Assumes the caller validates that there is something to preempt
* @param user_context - The context of our user-level Worker thread
*/
void
local_runqueue_minheap_preempt(ucontext_t *user_context)
{
assert(user_context != NULL);
assert(!software_interrupt_is_enabled());
struct sandbox *current_sandbox = current_sandbox_get();
/* If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. */
if (current_sandbox == NULL) return;
/* The current sandbox should be the head of the runqueue */
assert(local_runqueue_minheap_is_empty() == false);
uint64_t local_deadline = priority_queue_peek(local_runqueue_minheap);
uint64_t global_deadline = global_request_scheduler_peek();
/* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */
struct sandbox_request *sandbox_request = NULL;
if (global_deadline < local_deadline) {
#ifdef LOG_PREEMPTION
debuglog("Sandbox %lu has deadline of %lu. Trying to preempt for request with %lu\n",
current_sandbox->id, local_deadline, global_deadline);
#endif
int return_code = global_request_scheduler_remove_if_earlier(&sandbox_request, local_deadline);
/* If we were unable to get a sandbox_request, exit */
if (return_code != 0) {
#ifdef LOG_PREEMPTION
debuglog("Preemption aborted. Another thread took the request\n");
#endif
/* Assumption: Sandbox request should not be set in case of an error */
assert(sandbox_request == NULL);
goto done;
}
assert(sandbox_request->absolute_deadline < local_deadline);
#ifdef LOG_PREEMPTION
debuglog("Preempted %lu for %lu\n", local_deadline, sandbox_request->absolute_deadline);
#endif
/* Allocate the request */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) goto err_sandbox_allocate;
/* Set as runnable and add it to the runqueue */
assert(next_sandbox->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(next_sandbox, SANDBOX_INITIALIZED);
assert(current_sandbox->state == SANDBOX_RUNNING);
sandbox_set_as_preempted(current_sandbox, SANDBOX_RUNNING);
/* Save the context of the currently executing sandbox before switching from it */
arch_mcontext_save(&current_sandbox->ctxt, &user_context->uc_mcontext);
/* Update current_sandbox to the next sandbox */
assert(next_sandbox->state == SANDBOX_RUNNABLE);
sandbox_set_as_running(next_sandbox, SANDBOX_RUNNABLE);
/*
* Restore the context of this new sandbox
* user-level context switch state, so do not enable software interrupts.
* TODO: Review the interrupt logic here. Issue #63
*/
runtime_worker_threads_deadline[worker_thread_idx] = next_sandbox->absolute_deadline;
assert(!software_interrupt_is_enabled());
arch_context_restore_new(&user_context->uc_mcontext, &next_sandbox->ctxt);
}
done:
return;
err_sandbox_allocate:
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor, &sandbox_request->socket_address);
debuglog("local_runqueue_minheap_preempt failed to allocate sandbox\n");
err:
goto done;
} }
/** /**
@ -201,8 +93,7 @@ local_runqueue_minheap_initialize()
struct local_runqueue_config config = { .add_fn = local_runqueue_minheap_add, struct local_runqueue_config config = { .add_fn = local_runqueue_minheap_add,
.is_empty_fn = local_runqueue_minheap_is_empty, .is_empty_fn = local_runqueue_minheap_is_empty,
.delete_fn = local_runqueue_minheap_delete, .delete_fn = local_runqueue_minheap_delete,
.get_next_fn = local_runqueue_minheap_get_next, .get_next_fn = local_runqueue_minheap_get_next };
.preempt_fn = local_runqueue_minheap_preempt };
local_runqueue_initialize(&config); local_runqueue_initialize(&config);
} }

@ -1,25 +1,12 @@
#include <arpa/inet.h>
#include <assert.h> #include <assert.h>
#include <pthread.h>
#include <signal.h>
#include <sys/mman.h> #include <sys/mman.h>
#include "admissions_control.h"
#include "current_sandbox.h" #include "current_sandbox.h"
#include "debuglog.h" #include "debuglog.h"
#include "http_parser_settings.h"
#include "http_total.h"
#include "local_completion_queue.h"
#include "local_runqueue.h"
#include "likely.h"
#include "panic.h" #include "panic.h"
#include "runtime.h"
#include "sandbox_exit.h"
#include "sandbox_functions.h" #include "sandbox_functions.h"
#include "sandbox_set_as_error.h" #include "sandbox_set_as_error.h"
#include "sandbox_set_as_initialized.h" #include "sandbox_set_as_initialized.h"
#include "sandbox_set_as_running.h"
#include "worker_thread.h"
/** /**
* Close the sandbox's ith io_handle * Close the sandbox's ith io_handle
@ -240,49 +227,3 @@ err_free_stack_failed:
/* Errors freeing memory is a fatal error */ /* Errors freeing memory is a fatal error */
panic("Failed to free Sandbox %lu\n", sandbox->id); panic("Failed to free Sandbox %lu\n", sandbox->id);
} }
/**
* @brief Switches to the next sandbox, placing the current sandbox on the completion queue if in SANDBOX_RETURNED state
* @param next_sandbox The Sandbox Context to switch to
*/
void
sandbox_switch_to(struct sandbox *next_sandbox)
{
/* Assumption: The caller disables interrupts */
assert(!software_interrupt_is_enabled());
assert(next_sandbox != NULL);
struct arch_context *next_context = &next_sandbox->ctxt;
/* Get the old sandbox we're switching from.
* This is null if switching from base context
*/
struct sandbox * current_sandbox = current_sandbox_get();
struct arch_context *current_context = NULL;
if (current_sandbox != NULL) current_context = &current_sandbox->ctxt;
assert(next_sandbox != current_sandbox);
/* Update the worker's absolute deadline */
runtime_worker_threads_deadline[worker_thread_idx] = next_sandbox->absolute_deadline;
if (current_sandbox == NULL) {
/* Switching from "Base Context" */
#ifdef LOG_CONTEXT_SWITCHES
debuglog("Base Context (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", &worker_thread_base_context,
arch_context_variant_print(worker_thread_base_context.variant), next_sandbox->id, next_context,
arch_context_variant_print(next_context->variant));
#endif
} else {
#ifdef LOG_CONTEXT_SWITCHES
debuglog("Sandbox %lu (@%p) (%s) > Sandbox %lu (@%p) (%s)\n", current_sandbox->id,
&current_sandbox->ctxt, arch_context_variant_print(current_sandbox->ctxt.variant),
next_sandbox->id, &next_sandbox->ctxt, arch_context_variant_print(next_context->variant));
#endif
sandbox_exit(current_sandbox);
}
sandbox_set_as_running(next_sandbox, next_sandbox->state);
arch_context_switch(current_context, next_context);
}

@ -18,6 +18,7 @@
#include "panic.h" #include "panic.h"
#include "runtime.h" #include "runtime.h"
#include "sandbox_types.h" #include "sandbox_types.h"
#include "scheduler.h"
#include "software_interrupt.h" #include "software_interrupt.h"
/******************* /*******************
@ -124,11 +125,7 @@ sigalrm_handler(siginfo_t *signal_info, ucontext_t *user_context, struct sandbox
assert(current_sandbox->state != SANDBOX_RETURNED); assert(current_sandbox->state != SANDBOX_RETURNED);
/* Preempt */ /* Preempt */
local_runqueue_preempt(user_context); scheduler_preempt(user_context);
/* We have to call current_sandbox_get because the argument potentially points to what
* was just preempted */
if (current_sandbox_get()->ctxt.preemptable) software_interrupt_enable();
return; return;
} }
@ -204,6 +201,7 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
break; break;
} }
case SIGUSR1: { case SIGUSR1: {
assert(!software_interrupt_is_enabled());
sigusr1_handler(signal_info, user_context, current_sandbox); sigusr1_handler(signal_info, user_context, current_sandbox);
break; break;
} }
@ -220,6 +218,12 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
} }
} }
atomic_fetch_sub(&software_interrupt_signal_depth, 1); atomic_fetch_sub(&software_interrupt_signal_depth, 1);
/* Reenable software interrupts if we restored a preemptable sandbox
* We explicitly call current_sandbox_get becaue it might have been changed by a handler
*/
current_sandbox = current_sandbox_get();
if (current_sandbox && current_sandbox->ctxt.preemptable) software_interrupt_enable();
} }
/******************** /********************

@ -4,22 +4,17 @@
#include <signal.h> #include <signal.h>
#include <sched.h> #include <sched.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/mman.h>
#include "client_socket.h"
#include "current_sandbox.h" #include "current_sandbox.h"
#include "debuglog.h"
#include "global_request_scheduler.h"
#include "local_completion_queue.h" #include "local_completion_queue.h"
#include "local_runqueue.h" #include "local_runqueue.h"
#include "local_runqueue_list.h" #include "local_runqueue_list.h"
#include "local_runqueue_minheap.h" #include "local_runqueue_minheap.h"
#include "panic.h" #include "panic.h"
#include "runtime.h" #include "runtime.h"
#include "sandbox_functions.h" #include "scheduler.h"
#include "sandbox_set_as_runnable.h"
#include "sandbox_set_as_error.h"
#include "worker_thread.h" #include "worker_thread.h"
#include "worker_thread_execute_epoll_loop.h"
/*************************** /***************************
* Worker Thread State * * Worker Thread State *
@ -37,72 +32,6 @@ __thread int worker_thread_idx;
* Worker Thread Logic * * Worker Thread Logic *
**********************/ **********************/
/**
* Run all outstanding events in the local thread's epoll loop
*/
static inline void
worker_thread_execute_epoll_loop(void)
{
assert(software_interrupt_is_disabled);
while (true) {
struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS];
int descriptor_count = epoll_wait(worker_thread_epoll_file_descriptor, epoll_events,
RUNTIME_MAX_EPOLL_EVENTS, 0);
if (descriptor_count < 0) {
if (errno == EINTR) continue;
panic_err();
}
if (descriptor_count == 0) break;
for (int i = 0; i < descriptor_count; i++) {
if (epoll_events[i].events & (EPOLLIN | EPOLLOUT)) {
/* Re-add to runqueue if blocked */
struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr;
assert(sandbox);
if (sandbox->state == SANDBOX_BLOCKED) {
sandbox_set_as_runnable(sandbox, SANDBOX_BLOCKED);
}
} else if (epoll_events[i].events & (EPOLLERR | EPOLLHUP)) {
/* Mystery: This seems to never fire. Why? Issue #130 */
/* Close socket and set as error on socket error or unexpected client hangup */
struct sandbox *sandbox = (struct sandbox *)epoll_events[i].data.ptr;
int error = 0;
socklen_t errlen = sizeof(error);
getsockopt(epoll_events[i].data.fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen);
if (error > 0) {
debuglog("Socket error: %s", strerror(error));
} else if (epoll_events[i].events & EPOLLHUP) {
debuglog("Client Hungup");
} else {
debuglog("Unknown Socket error");
}
switch (sandbox->state) {
case SANDBOX_SET_AS_RETURNED:
case SANDBOX_RETURNED:
case SANDBOX_SET_AS_COMPLETE:
case SANDBOX_COMPLETE:
case SANDBOX_SET_AS_ERROR:
case SANDBOX_ERROR:
panic("Expected to have closed socket");
default:
client_socket_send(sandbox->client_socket_descriptor, 503);
sandbox_close_http(sandbox);
sandbox_set_as_error(sandbox, sandbox->state);
}
} else {
panic("Mystery epoll event!\n");
};
}
}
}
/** /**
* The entry function for sandbox worker threads * The entry function for sandbox worker threads
* Initializes thread-local state, unmasks signals, sets up epoll loop and * Initializes thread-local state, unmasks signals, sets up epoll loop and
@ -159,8 +88,8 @@ worker_thread_main(void *argument)
worker_thread_execute_epoll_loop(); worker_thread_execute_epoll_loop();
/* Switch to a sandbox if one is ready to run */ /* Switch to a sandbox if one is ready to run */
next_sandbox = local_runqueue_get_next(); next_sandbox = scheduler_get_next();
if (next_sandbox != NULL) { sandbox_switch_to(next_sandbox); } if (next_sandbox != NULL) { scheduler_switch_to(next_sandbox); }
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
/* Clear the completion queue */ /* Clear the completion queue */

Loading…
Cancel
Save