Merge pull request #74 from phanikishoreg/documentation

chore: port simple documentation and error handling
main
Sean McBride 4 years ago committed by GitHub
commit 87795f580f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,6 +4,8 @@
#include <unistd.h>
#include <ucontext.h>
#include "arch_context.h"
#define ARCH_NREGS (2) /* SP + PC only */
#define ARCH_SIG_JMP_OFF 0x100 /* Based on code generated! */
@ -18,7 +20,6 @@ struct arch_context {
mcontext_t mctx;
};
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void);
extern __thread struct arch_context worker_thread_base_context;
/* Initialized a context, zeroing out registers and setting the Instruction and Stack pointers */
@ -104,7 +105,7 @@ arch_context_switch(struct arch_context *ca, struct arch_context *na)
".align 8\n\t"
"exit%=:\n\t"
:
: [ curr ] "r"(cr), [ next ] "r"(nr), [ slowpath ] "r"(&worker_thread_sandbox_switch_preempt)
: [ curr ] "r"(cr), [ next ] "r"(nr), [ slowpath ] "r"(&arch_context_mcontext_restore)
: "memory", "cc", "x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12",
"x13", "x14", "x15", "x16", "x17", "x18", "x19", "x20", "x21", "x22", "x23", "x24", "x25", "x26",
"d8", "d9", "d10", "d11", "d12", "d13", "d14", "d15");

@ -7,6 +7,7 @@
#include <ucontext.h>
#include <unistd.h>
#include "arch_context.h"
#include "software_interrupt.h"
#define ARCH_SIG_JMP_OFF 8
@ -31,7 +32,6 @@ struct arch_context {
mcontext_t mctx;
};
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void);
extern __thread struct arch_context worker_thread_base_context;
static void __attribute__((noinline)) arch_context_init(struct arch_context *actx, reg_t ip, reg_t sp)
@ -44,14 +44,15 @@ static void __attribute__((noinline)) arch_context_init(struct arch_context *act
* context_switch conventions: bp is expected to be on top of the stack
* when co-op context switching..
*
* so push sp on this new stack and use
* that new sp as sp for switching to sandbox!
* Temporarily switches the active stack to the stack pointer stored in sp
* to push the stack pointer sp to the top of its own stack.
* This acts as the base pointer
*/
asm volatile("movq %%rsp, %%rbx\n\t"
"movq %%rax, %%rsp\n\t"
"pushq %%rax\n\t"
"movq %%rsp, %%rax\n\t"
"movq %%rbx, %%rsp\n\t"
asm volatile("movq %%rsp, %%rbx\n\t" /* Temporarily save pointer of active stack to B */
"movq %%rax, %%rsp\n\t" /* Set active stack to stack pointer in A(C variable sp) */
"pushq %%rax\n\t" /* Push A(C variable sp) onto the stack at sp */
"movq %%rsp, %%rax\n\t" /* Write the incremented stack pointer to A(C variable sp) */
"movq %%rbx, %%rsp\n\t" /* Restore original stack saved in B */
: "=a"(sp)
: "a"(sp)
: "memory", "cc", "rbx");
@ -122,6 +123,9 @@ arch_context_switch(struct arch_context *current, struct arch_context *next)
/* if both current and next are NULL, there is no state change */
assert(current != NULL || next != NULL);
/* Assumption: The caller does not switch to itself */
assert(current != next);
/* Set any NULLs to worker_thread_base_context to resume execution of main */
if (current == NULL) current = &worker_thread_base_context;
if (next == NULL) next = &worker_thread_base_context;
@ -129,27 +133,58 @@ arch_context_switch(struct arch_context *current, struct arch_context *next)
reg_t *current_registers = current->regs, *next_registers = next->regs;
assert(current_registers && next_registers);
asm volatile("pushq %%rbp\n\t"
"movq %%rsp, %%rbp\n\t"
"movq $2f, 8(%%rax)\n\t"
"movq %%rsp, (%%rax)\n\t"
"cmpq $0, (%%rbx)\n\t"
"je 1f\n\t"
"movq (%%rbx), %%rsp\n\t"
"jmpq *8(%%rbx)\n\t"
asm volatile(
/* Create a new stack frame */
"pushq %%rbp\n\t" /* stack[stack_len++] = base_pointer */
"movq %%rsp, %%rbp\n\t" /* base_pointer = stack_pointer. Start new Frame */
/*
* Save the IP and stack pointer to the context of the sandbox we're switching from
*/
"movq $2f, 8(%%rax)\n\t" /* Write the address of label 2 to current_registers[1] (instruction_pointer). */
"movq %%rsp, (%%rax)\n\t" /* current_registers[0] (stack_pointer) = stack_pointer */
/*
* Check if the variant of the context we're trying to switch to is SLOW (mcontext-based)
* If it is, jump to label 1 to restore the preempted sandbox
*/
"cmpq $0, (%%rbx)\n\t" /* if (stack pointer == 0) */
"je 1f\n\t" /* goto 1; restore the existing sandbox using mcontext */
/*
* Fast Path
* We can just write update the stack pointer and jump to the target instruction
*/
"movq (%%rbx), %%rsp\n\t" /* stack_pointer = next_registers[0] (stack_pointer) */
"jmpq *8(%%rbx)\n\t" /* immediate jump to next_registers[1] (instruction_pointer) */
/*
* Slow Path
* If the stack pointer equaled 0, that means the sandbox was preempted and we need to
* fallback to a full mcontext-based context switch. We do this by invoking
* arch_context_mcontext_restore, which fires a SIGUSR1 signal. The SIGUSR1 signal handler
* executes the mcontext-based context switch.
*/
"1:\n\t"
"call worker_thread_sandbox_switch_preempt\n\t"
"call arch_context_mcontext_restore\n\t"
".align 8\n\t"
/*
* Where preempted sandbox resumes
* rbx contains the preempted sandbox's IP and SP in this context
*/
"2:\n\t"
"movq $0, (%%rbx)\n\t"
"movq $0, (%%rbx)\n\t" /* stack pointer = 0 */
".align 8\n\t"
/* This label is used in conjunction with a static offset */
"3:\n\t"
"popq %%rbp\n\t"
"popq %%rbp\n\t" /* base_pointer = stack[--stack_len]; Base Pointer is restored */
:
: "a"(current_registers), "b"(next_registers)
: "memory", "cc", "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15",
"xmm0", "xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7", "xmm8", "xmm9", "xmm10", "xmm11",
"xmm12", "xmm13", "xmm14", "xmm15");
: "memory", "cc", "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", "xmm0",
"xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7", "xmm8", "xmm9", "xmm10", "xmm11", "xmm12", "xmm13",
"xmm14", "xmm15");
return 0;
}

@ -0,0 +1 @@
void __attribute__((noinline)) __attribute__((noreturn)) arch_context_mcontext_restore(void);

@ -44,9 +44,10 @@ struct sandbox {
struct arch_context ctxt; /* register context for context switch. */
uint64_t total_time;
uint64_t start_time;
uint64_t request_arrival_timestamp;
uint64_t absolute_deadline;
uint64_t total_time;
struct module *module; /* the module this is an instance of */
@ -85,7 +86,6 @@ extern __thread struct arch_context *worker_thread_next_context;
extern void worker_thread_block_current_sandbox(void);
extern void worker_thread_on_sandbox_exit(struct sandbox *sandbox);
extern void worker_thread_process_io(void);
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void);
extern void worker_thread_wakeup_sandbox(struct sandbox *sandbox);
/***************************

@ -14,7 +14,7 @@ struct sandbox_request {
char * arguments;
int socket_descriptor;
struct sockaddr *socket_address;
uint64_t start_time; /* cycles */
uint64_t request_arrival_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */
};
@ -26,12 +26,12 @@ DEQUE_PROTOTYPE(sandbox, struct sandbox_request *);
* @param arguments the arguments that we'll pass to the serverless function
* @param socket_descriptor
* @param socket_address
* @param start_time the timestamp of when we receives the request from the network (in cycles)
* @param request_arrival_timestamp the timestamp of when we receives the request from the network (in cycles)
* @return the new sandbox request
*/
static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor,
const struct sockaddr *socket_address, uint64_t start_time)
const struct sockaddr *socket_address, uint64_t request_arrival_timestamp)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
assert(sandbox_request);
@ -39,8 +39,9 @@ sandbox_request_allocate(struct module *module, char *arguments, int socket_desc
sandbox_request->arguments = arguments;
sandbox_request->socket_descriptor = socket_descriptor;
sandbox_request->socket_address = (struct sockaddr *)socket_address;
sandbox_request->start_time = start_time;
sandbox_request->absolute_deadline = start_time + module->relative_deadline_us * runtime_processor_speed_MHz;
sandbox_request->request_arrival_timestamp = request_arrival_timestamp;
sandbox_request->absolute_deadline = request_arrival_timestamp
+ module->relative_deadline_us * runtime_processor_speed_MHz;
debuglog("[%p: %s]\n", sandbox_request, sandbox_request->module->name);
return sandbox_request;

@ -0,0 +1,16 @@
#include <signal.h>
#include <pthread.h>
#include "types.h"
/**
* Called by the inline assembly in arch_context_switch to send a SIGUSR1 in order to restore a previously preempted
* thread. The only way to restore all of the mcontext registers of a preempted sandbox is to send ourselves a signal,
* then update the registers we should return to, then sigreturn (by returning from the handler). This returns to the
* control flow restored from the mcontext
*/
void __attribute__((noinline)) __attribute__((noreturn)) arch_context_mcontext_restore(void)
{
pthread_kill(pthread_self(), SIGUSR1);
assert(false);
}

@ -55,13 +55,13 @@ local_runqueue_minheap_delete(struct sandbox *sandbox)
int rc = priority_queue_delete(&local_runqueue_minheap, sandbox);
if (rc == -1) {
panic("Err: Thread Local %lu tried to delete sandbox %lu from runqueue, but was not present\n",
pthread_self(), sandbox->start_time);
pthread_self(), sandbox->request_arrival_timestamp);
}
}
/**
* This function determines the next sandbox to run. This is either the head of the runqueue or the head of the request
*queue
* This function determines the next sandbox to run.
* This is either the head of the runqueue or the head of the request queue
*
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
@ -100,14 +100,19 @@ local_runqueue_minheap_get_next()
/**
* Conditionally checks to see if current sandbox should be preempted
* Called by the SIGALRM handler after a quantum
* Assumes the caller validates that there is something to preempt
* @param user_context - The context of our user-level Worker thread
*/
void
local_runqueue_minheap_preempt(ucontext_t *user_context)
{
assert(user_context != NULL);
software_interrupt_disable(); /* no nesting! */
struct sandbox *current_sandbox = current_sandbox_get();
/* If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. */
if (current_sandbox == NULL) {
software_interrupt_enable();
@ -117,10 +122,6 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* The current sandbox should be the head of the runqueue */
assert(local_runqueue_minheap_is_empty() == false);
// TODO: Factor quantum and/or sandbox allocation time into decision
// uint64_t global_deadline = global_request_scheduler_peek() -
// software_interrupt_interval_duration_in_cycles;
bool should_enable_software_interrupt = true;
uint64_t local_deadline = priority_queue_peek(&local_runqueue_minheap);
uint64_t global_deadline = global_request_scheduler_peek();
@ -128,9 +129,17 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* Our local deadline should only be ULONG_MAX if our local runqueue is empty */
if (local_deadline == ULONG_MAX) { assert(local_runqueue_minheap.first_free == 1); };
/* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */
struct sandbox_request *sandbox_request;
/*
* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it
*
* TODO: Factor quantum and/or sandbox allocation time into decision
* Something like global_request_scheduler_peek() - software_interrupt_interval_duration_in_cycles;
*/
if (global_deadline < local_deadline) {
debuglog("Thread %lu | Sandbox %lu | Had deadline of %lu. Trying to preempt for request with %lu\n",
pthread_self(), current_sandbox->allocation_timestamp, local_deadline, global_deadline);
struct sandbox_request *sandbox_request;
int return_code = global_request_scheduler_remove(&sandbox_request);
@ -155,9 +164,11 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* Update current_sandbox to the next sandbox */
current_sandbox_set(next_sandbox);
/* And load the context of this new sandbox
RC of 1 indicates that sandbox was last in a user-level context switch state,
so do not enable software interrupts. */
/*
* And load the context of this new sandbox
* RC of 1 indicates that sandbox was last in a user-level context switch state,
* so do not enable software interrupts.
*/
if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1)
should_enable_software_interrupt = false;
}

@ -74,7 +74,7 @@ listener_thread_main(void *dummy)
LISTENER_THREAD_MAX_EPOLL_EVENTS, -1);
/* Capture Start Time to calculate absolute deadline */
uint64_t start_time = __getcycles();
uint64_t request_arrival_timestamp = __getcycles();
for (int i = 0; i < request_count; i++) {
if (epoll_events[i].events & EPOLLERR) {
perror("epoll_wait");
@ -97,7 +97,7 @@ listener_thread_main(void *dummy)
/* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, module->name, socket_descriptor,
(const struct sockaddr *)&client_address, start_time);
(const struct sockaddr *)&client_address, request_arrival_timestamp);
assert(sandbox_request);
/* Add to the Global Sandbox Request Scheduler */

@ -79,7 +79,8 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
r = recv(sandbox->client_socket_descriptor, (sandbox->request_response_data), sandbox->module->max_request_size,
0);
if (r <= 0) {
if (r < 0) perror("recv1");
if (r < 0) perror("Error reading request data from client socket");
if (r == 0) perror("Client unexpectedly returned zero bytes");
return r;
}
while (r > 0) {
@ -101,7 +102,10 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
libuv_callbacks_on_allocate_setup_request_response_data,
libuv_callbacks_on_read_parse_http_request);
worker_thread_process_io();
if (sandbox->request_response_data_length == 0) return 0;
if (sandbox->request_response_data_length == 0) {
perror("request_response_data_length was unexpectedly 0");
return 0
};
#endif
return 1;
}
@ -147,7 +151,7 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox)
done:
assert(sndsz == sandbox->request_response_data_length);
uint64_t end_time = __getcycles();
sandbox->total_time = end_time - sandbox->start_time;
sandbox->total_time = end_time - sandbox->request_arrival_timestamp;
uint64_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz;
debuglog("%s():%d, %u, %lu\n", sandbox->module->name, sandbox->module->port,
@ -244,6 +248,8 @@ current_sandbox_main(void)
assert(sandbox != NULL);
assert(sandbox->state == SANDBOX_RUNNABLE);
char *error_message = "";
assert(!software_interrupt_is_enabled());
arch_context_init(&sandbox->ctxt, 0, 0);
worker_thread_next_context = NULL;
@ -255,7 +261,10 @@ current_sandbox_main(void)
/* Parse the request. 1 = Success */
int rc = sandbox_receive_and_parse_client_request(sandbox);
if (rc != 1) goto done;
if (rc != 1) {
error_message = "Unable to receive and parse client request\n";
goto err;
};
/* Initialize the module */
struct module *current_module = sandbox_get_module(sandbox);
@ -271,7 +280,11 @@ current_sandbox_main(void)
sandbox->return_value = module_main(current_module, argument_count, sandbox->arguments_offset);
/* Retrieve the result, construct the HTTP response, and send to client */
sandbox_build_and_send_client_response(sandbox);
rc = sandbox_build_and_send_client_response(sandbox);
if (rc == -1) {
error_message = "Unable to build and send client response\n";
goto err;
};
done:
/* Cleanup connection and exit sandbox */
@ -282,6 +295,9 @@ done:
* https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66
*/
assert(0);
err:
fprintf(stderr, "%s", error_message);
goto done;
}
/**
@ -371,27 +387,32 @@ sandbox_allocate(struct sandbox_request *sandbox_request)
assert(sandbox_request->module != NULL);
assert(module_is_valid(sandbox_request->module));
char * error_message = NULL;
char * error_message = "";
int rc;
struct sandbox *sandbox = NULL;
/* Allocate Sandbox control structures, buffers, and linear memory in a 4GB address space */
errno = 0;
sandbox = (struct sandbox *)sandbox_allocate_memory(sandbox_request->module);
if (!sandbox) goto err_memory_allocation_failed;
if (!sandbox) {
error_message = "failed to allocate sandbox heap and linear memory";
goto err_memory_allocation_failed;
}
/* Set state to initializing */
sandbox->state = SANDBOX_INITIALIZING;
/* Allocate the Stack */
rc = sandbox_allocate_stack(sandbox);
if (rc != 0) goto err_stack_allocation_failed;
if (rc != 0) {
error_message = "failed to allocate sandbox heap and linear memory";
goto err_stack_allocation_failed;
}
/* Copy the socket descriptor, address, and arguments of the client invocation */
sandbox->absolute_deadline = sandbox_request->absolute_deadline;
sandbox->arguments = (void *)sandbox_request->arguments;
sandbox->client_socket_descriptor = sandbox_request->socket_descriptor;
sandbox->start_time = sandbox_request->start_time;
sandbox->request_arrival_timestamp = sandbox_request->request_arrival_timestamp;
/* Initialize the sandbox's context, stack, and instruction pointer */
arch_context_init(&sandbox->ctxt, (reg_t)current_sandbox_main,
@ -415,6 +436,7 @@ err_stack_allocation_failed:
err_memory_allocation_failed:
err:
perror(error_message);
sandbox = NULL;
goto done;
}

@ -11,6 +11,7 @@
#include "current_sandbox.h"
#include "local_runqueue.h"
#include "module.h"
#include "panic.h"
#include "runtime.h"
#include "sandbox.h"
#include "software_interrupt.h"
@ -64,53 +65,81 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
case SIGALRM: {
/* SIGALRM is the preemption signal that occurs every quantum of execution */
/* A POSIX signal is delivered to one of the threads in our process.If sent by the kernel, "broadcast"
* by forwarding to all all threads */
/*
* A POSIX signal is delivered to only one thread.
* We need to ensure this thread broadcasts to all other threads
*/
if (signal_info->si_code == SI_KERNEL) {
/* Signal was sent directly by the kernel, so forward to other threads */
for (int i = 0; i < runtime_total_worker_processors; i++) {
if (pthread_self() != runtime_worker_threads[i]) {
if (pthread_self() == runtime_worker_threads[i]) continue;
pthread_kill(runtime_worker_threads[i], SIGALRM);
}
}
} else {
/* If not sent by the kernel, this should be a signal forwarded from another thread */
/* Signal forwarded from another thread. Just confirm it resulted from pthread_kill */
assert(signal_info->si_code == SI_TKILL);
}
// debuglog("alrm:%d\n", software_interrupt_SIGALRM_count);
debuglog("alrm:%d\n", software_interrupt_SIGALRM_count);
software_interrupt_SIGALRM_count++;
/* if the current sandbox is NULL or not in a returned state */
if (current_sandbox && current_sandbox->state == SANDBOX_RETURNED) return;
/* and the next context is NULL */
if (worker_thread_next_context) return;
/* and software interrupts are not disabled */
/* NOOP if software interrupts not enabled */
if (!software_interrupt_is_enabled()) return;
/* Do not allow more than one layer of preemption */
if (worker_thread_next_context) return;
/*
* if a SIGALRM fires while the worker thread is between sandboxes, executing libuv, completion queue
* cleanup, etc. current_sandbox might be NULL. In this case, we should just allow return to allow the
* worker thread to run the main loop until it loads a new sandbox.
*
* TODO: Consider if this should be an invarient and the worker thread should disable software
* interrupts when doing this work.
*/
if (!current_sandbox) return;
/*
* if a SIGALRM fires while the worker thread executing cleanup of a sandbox, it might be in a RETURNED
* state. In this case, we should just allow return to allow the sandbox to complete cleanup, as it is
* about to switch to a new sandbox.
*
* TODO: Consider if this should be an invarient and the worker thread should disable software
* interrupts when doing this work.
*/
if (current_sandbox->state == SANDBOX_RETURNED) return;
/* Preempt */
local_runqueue_preempt(user_context);
return;
}
case SIGUSR1: { /* SIGUSR1 restores the preempted sandbox stored in worker_thread_next_context. */
/* Make sure *sigalrm doesn't mess this up if nested.. */
case SIGUSR1: {
/* SIGUSR1 restores a preempted sandbox using mcontext. */
/* Assumption: Caller disables interrupt before triggering SIGUSR1 */
assert(!software_interrupt_is_enabled());
/* we set current before calling pthread_kill! */
/* Assumption: Caller sets current_sandbox to the preempted sandbox */
assert(worker_thread_next_context && (&current_sandbox->ctxt == worker_thread_next_context));
assert(signal_info->si_code == SI_TKILL);
// debuglog("usr1:%d\n", software_interrupt_SIGUSR_count);
software_interrupt_SIGUSR_count++;
/* do not save current sandbox.. it is in co-operative switch..
pick the next from "worker_thread_next_context"..
assert its "sp" to be zero in regs..
memcpy from next context.. */
debuglog("usr1:%d\n", software_interrupt_SIGUSR_count);
arch_mcontext_restore(&user_context->uc_mcontext, &current_sandbox->ctxt);
worker_thread_next_context = NULL;
software_interrupt_enable();
break;
return;
}
default:
break;
if (signal_info->si_code == SI_TKILL) {
panic("Unexpectedly received signal %d from a thread kill, but we have no handler\n",
signal_type);
} else if (signal_info->si_code == SI_KERNEL) {
panic("Unexpectedly received signal %d from the kernel, but we have no handler\n", signal_type);
}
}
#endif
}

@ -30,7 +30,7 @@ __thread struct arch_context worker_thread_base_context;
__thread uv_loop_t worker_thread_uvio_handle;
/* Flag to signify if the thread is currently running callbacks in the libuv event loop */
static __thread bool worker_thread_is_in_callback;
static __thread bool worker_thread_is_in_libuv_event_loop = false;
/***********************
* Worker Thread Logic *
@ -84,8 +84,10 @@ worker_thread_switch_to_sandbox(struct sandbox *next_sandbox)
void
worker_thread_wakeup_sandbox(struct sandbox *sandbox)
{
assert(sandbox != NULL);
assert(sandbox->state == SANDBOX_BLOCKED);
software_interrupt_disable();
if (sandbox->state != SANDBOX_BLOCKED) goto done;
sandbox->state = SANDBOX_RUNNABLE;
debuglog("Marking blocked sandbox as runnable\n");
@ -103,7 +105,7 @@ done:
void
worker_thread_block_current_sandbox(void)
{
assert(worker_thread_is_in_callback == false);
assert(worker_thread_is_in_libuv_event_loop == false);
software_interrupt_disable();
/* Remove the sandbox we were just executing from the runqueue and mark as blocked */
@ -140,33 +142,19 @@ worker_thread_process_io(void)
#endif
}
/**
* We need to switch back to a previously preempted thread. The only way to restore all of its registers is to use
* sigreturn. To get to sigreturn, we need to send ourselves a signal, then update the registers we should return to,
* then sigreturn (by returning from the handler).
*/
void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void)
{
pthread_kill(pthread_self(), SIGUSR1);
assert(false); /* should not get here.. */
while (true)
;
}
/**
* Run all outstanding events in the local thread's libuv event loop
*/
void
worker_thread_execute_libuv_event_loop(void)
{
worker_thread_is_in_callback = true;
worker_thread_is_in_libuv_event_loop = true;
int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0;
while (n > 0) {
n--;
uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT);
}
worker_thread_is_in_callback = false;
worker_thread_is_in_libuv_event_loop = false;
}
/**
@ -177,27 +165,40 @@ worker_thread_execute_libuv_event_loop(void)
void *
worker_thread_main(void *return_code)
{
/* Initialize Worker Infrastructure */
/* Initialize Base Context */
arch_context_init(&worker_thread_base_context, 0, 0);
/* Initialize Runqueue Variant */
// local_runqueue_list_initialize();
local_runqueue_minheap_initialize();
/* Initialize Completion Queue */
local_completion_queue_initialize();
/* Initialize Flags */
software_interrupt_is_disabled = false;
worker_thread_is_in_libuv_event_loop = false;
worker_thread_next_context = NULL;
/* Unmask signals */
#ifndef PREEMPT_DISABLE
software_interrupt_unmask_signal(SIGALRM);
software_interrupt_unmask_signal(SIGUSR1);
#endif
/* Initialize libuv event loop handle */
uv_loop_init(&worker_thread_uvio_handle);
worker_thread_is_in_callback = false;
/* Begin Worker Execution Loop */
struct sandbox *next_sandbox;
while (true) {
/* Assumption: current_sandbox should be unset at start of loop */
assert(current_sandbox_get() == NULL);
/* If "in a callback", the libuv event loop is triggering this, so we don't need to start it */
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
/* Execute libuv event loop */
if (!worker_thread_is_in_libuv_event_loop) worker_thread_execute_libuv_event_loop();
/* Switch to a sandbox if one is ready to run */
software_interrupt_disable();
next_sandbox = local_runqueue_get_next();
if (next_sandbox != NULL) {
@ -206,6 +207,7 @@ worker_thread_main(void *return_code)
software_interrupt_enable();
};
/* Clear the completion queue */
local_completion_queue_free();
}
@ -215,10 +217,10 @@ worker_thread_main(void *return_code)
/**
* Called when the function in the sandbox exits
* Removes the standbox from the thread-local runqueue, sets its state to SANDBOX_RETURNED,
* releases the linear memory, and then switches to the sandbox at the head of the runqueue
* releases the linear memory, and then returns to the base context
* TODO: Consider moving this to a future current_sandbox file. This has thus far proven difficult to move
*/
void
__attribute__((noreturn)) void
worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox)
{
assert(exiting_sandbox);
@ -237,4 +239,6 @@ worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox)
/* This should force return to main event loop */
worker_thread_switch_to_sandbox(NULL);
assert(0);
}

Loading…
Cancel
Save