chore: port simple doc and error handlign

sledge_graph
Sean McBride 5 years ago
parent a2319f2a68
commit d1f80d8b1e

@ -18,7 +18,7 @@ struct arch_context {
mcontext_t mctx; mcontext_t mctx;
}; };
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void); extern void __attribute__((noreturn)) worker_thread_mcontext_restore(void);
extern __thread struct arch_context worker_thread_base_context; extern __thread struct arch_context worker_thread_base_context;
/* Initialized a context, zeroing out registers and setting the Instruction and Stack pointers */ /* Initialized a context, zeroing out registers and setting the Instruction and Stack pointers */
@ -104,7 +104,7 @@ arch_context_switch(struct arch_context *ca, struct arch_context *na)
".align 8\n\t" ".align 8\n\t"
"exit%=:\n\t" "exit%=:\n\t"
: :
: [ curr ] "r"(cr), [ next ] "r"(nr), [ slowpath ] "r"(&worker_thread_sandbox_switch_preempt) : [ curr ] "r"(cr), [ next ] "r"(nr), [ slowpath ] "r"(&worker_thread_mcontext_restore)
: "memory", "cc", "x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", : "memory", "cc", "x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12",
"x13", "x14", "x15", "x16", "x17", "x18", "x19", "x20", "x21", "x22", "x23", "x24", "x25", "x26", "x13", "x14", "x15", "x16", "x17", "x18", "x19", "x20", "x21", "x22", "x23", "x24", "x25", "x26",
"d8", "d9", "d10", "d11", "d12", "d13", "d14", "d15"); "d8", "d9", "d10", "d11", "d12", "d13", "d14", "d15");

@ -31,7 +31,7 @@ struct arch_context {
mcontext_t mctx; mcontext_t mctx;
}; };
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void); extern void __attribute__((noreturn)) worker_thread_mcontext_restore(void);
extern __thread struct arch_context worker_thread_base_context; extern __thread struct arch_context worker_thread_base_context;
static void __attribute__((noinline)) arch_context_init(struct arch_context *actx, reg_t ip, reg_t sp) static void __attribute__((noinline)) arch_context_init(struct arch_context *actx, reg_t ip, reg_t sp)
@ -44,14 +44,15 @@ static void __attribute__((noinline)) arch_context_init(struct arch_context *act
* context_switch conventions: bp is expected to be on top of the stack * context_switch conventions: bp is expected to be on top of the stack
* when co-op context switching.. * when co-op context switching..
* *
* so push sp on this new stack and use * Temporarily switches the active stack to the stack pointer stored in sp
* that new sp as sp for switching to sandbox! * to push the stack pointer sp to the top of its own stack.
* This acts as the base pointer
*/ */
asm volatile("movq %%rsp, %%rbx\n\t" asm volatile("movq %%rsp, %%rbx\n\t" /* Temporarily save pointer of active stack to B */
"movq %%rax, %%rsp\n\t" "movq %%rax, %%rsp\n\t" /* Set active stack to stack pointer in A(C variable sp) */
"pushq %%rax\n\t" "pushq %%rax\n\t" /* Push A(C variable sp) onto the stack at sp */
"movq %%rsp, %%rax\n\t" "movq %%rsp, %%rax\n\t" /* Write the incremented stack pointer to A(C variable sp) */
"movq %%rbx, %%rsp\n\t" "movq %%rbx, %%rsp\n\t" /* Restore original stack saved in B */
: "=a"(sp) : "=a"(sp)
: "a"(sp) : "a"(sp)
: "memory", "cc", "rbx"); : "memory", "cc", "rbx");
@ -122,6 +123,9 @@ arch_context_switch(struct arch_context *current, struct arch_context *next)
/* if both current and next are NULL, there is no state change */ /* if both current and next are NULL, there is no state change */
assert(current != NULL || next != NULL); assert(current != NULL || next != NULL);
/* Assumption: The caller does not switch to itself */
assert(current != next);
/* Set any NULLs to worker_thread_base_context to resume execution of main */ /* Set any NULLs to worker_thread_base_context to resume execution of main */
if (current == NULL) current = &worker_thread_base_context; if (current == NULL) current = &worker_thread_base_context;
if (next == NULL) next = &worker_thread_base_context; if (next == NULL) next = &worker_thread_base_context;
@ -129,27 +133,58 @@ arch_context_switch(struct arch_context *current, struct arch_context *next)
reg_t *current_registers = current->regs, *next_registers = next->regs; reg_t *current_registers = current->regs, *next_registers = next->regs;
assert(current_registers && next_registers); assert(current_registers && next_registers);
asm volatile("pushq %%rbp\n\t" asm volatile(
"movq %%rsp, %%rbp\n\t" /* Create a new stack frame */
"movq $2f, 8(%%rax)\n\t" "pushq %%rbp\n\t" /* stack[stack_len++] = base_pointer */
"movq %%rsp, (%%rax)\n\t" "movq %%rsp, %%rbp\n\t" /* base_pointer = stack_pointer. Start new Frame */
"cmpq $0, (%%rbx)\n\t"
"je 1f\n\t" /*
"movq (%%rbx), %%rsp\n\t" * Save the IP and stack pointer to the context of the sandbox we're switching from
"jmpq *8(%%rbx)\n\t" */
"movq $2f, 8(%%rax)\n\t" /* Write the address of label 2 to current_registers[1] (instruction_pointer). */
"movq %%rsp, (%%rax)\n\t" /* current_registers[0] (stack_pointer) = stack_pointer */
/*
* Check if the variant of the context we're trying to switch to is SLOW (mcontext-based)
* If it is, jump to label 1 to restore the preempted sandbox
*/
"cmpq $0, (%%rbx)\n\t" /* if (stack pointer == 0) */
"je 1f\n\t" /* goto 1; restore the existing sandbox using mcontext */
/*
* Fast Path
* We can just write update the stack pointer and jump to the target instruction
*/
"movq (%%rbx), %%rsp\n\t" /* stack_pointer = next_registers[0] (stack_pointer) */
"jmpq *8(%%rbx)\n\t" /* immediate jump to next_registers[1] (instruction_pointer) */
/*
* Slow Path
* If the stack pointer equaled 0, that means the sandbox was preempted and we need to
* fallback to a full mcontext-based context switch. We do this by invoking
* worker_thread_mcontext_restore, which fires a SIGUSR1 signal. The SIGUSR1 signal handler
* executes the mcontext-based context switch.
*/
"1:\n\t" "1:\n\t"
"call worker_thread_sandbox_switch_preempt\n\t" "call worker_thread_mcontext_restore\n\t"
".align 8\n\t" ".align 8\n\t"
/*
* Where preempted sandbox resumes
* rbx contains the preempted sandbox's IP and SP in this context
*/
"2:\n\t" "2:\n\t"
"movq $0, (%%rbx)\n\t" "movq $0, (%%rbx)\n\t" /* stack pointer = 0 */
".align 8\n\t" ".align 8\n\t"
/* This label is used in conjunction with a static offset */
"3:\n\t" "3:\n\t"
"popq %%rbp\n\t" "popq %%rbp\n\t" /* base_pointer = stack[--stack_len]; Base Pointer is restored */
: :
: "a"(current_registers), "b"(next_registers) : "a"(current_registers), "b"(next_registers)
: "memory", "cc", "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", : "memory", "cc", "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", "xmm0",
"xmm0", "xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7", "xmm8", "xmm9", "xmm10", "xmm11", "xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7", "xmm8", "xmm9", "xmm10", "xmm11", "xmm12", "xmm13",
"xmm12", "xmm13", "xmm14", "xmm15"); "xmm14", "xmm15");
return 0; return 0;
} }

@ -44,9 +44,10 @@ struct sandbox {
struct arch_context ctxt; /* register context for context switch. */ struct arch_context ctxt; /* register context for context switch. */
uint64_t total_time; uint64_t request_timestamp;
uint64_t start_time;
uint64_t absolute_deadline; uint64_t absolute_deadline;
uint64_t total_time;
struct module *module; /* the module this is an instance of */ struct module *module; /* the module this is an instance of */
@ -85,7 +86,7 @@ extern __thread struct arch_context *worker_thread_next_context;
extern void worker_thread_block_current_sandbox(void); extern void worker_thread_block_current_sandbox(void);
extern void worker_thread_on_sandbox_exit(struct sandbox *sandbox); extern void worker_thread_on_sandbox_exit(struct sandbox *sandbox);
extern void worker_thread_process_io(void); extern void worker_thread_process_io(void);
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void); extern void __attribute__((noreturn)) worker_thread_mcontext_restore(void);
extern void worker_thread_wakeup_sandbox(struct sandbox *sandbox); extern void worker_thread_wakeup_sandbox(struct sandbox *sandbox);
/*************************** /***************************

@ -14,7 +14,7 @@ struct sandbox_request {
char * arguments; char * arguments;
int socket_descriptor; int socket_descriptor;
struct sockaddr *socket_address; struct sockaddr *socket_address;
uint64_t start_time; /* cycles */ uint64_t request_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */ uint64_t absolute_deadline; /* cycles */
}; };
@ -26,12 +26,12 @@ DEQUE_PROTOTYPE(sandbox, struct sandbox_request *);
* @param arguments the arguments that we'll pass to the serverless function * @param arguments the arguments that we'll pass to the serverless function
* @param socket_descriptor * @param socket_descriptor
* @param socket_address * @param socket_address
* @param start_time the timestamp of when we receives the request from the network (in cycles) * @param request_timestamp the timestamp of when we receives the request from the network (in cycles)
* @return the new sandbox request * @return the new sandbox request
*/ */
static inline struct sandbox_request * static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor, sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor,
const struct sockaddr *socket_address, uint64_t start_time) const struct sockaddr *socket_address, uint64_t request_timestamp)
{ {
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request)); struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
assert(sandbox_request); assert(sandbox_request);
@ -39,8 +39,9 @@ sandbox_request_allocate(struct module *module, char *arguments, int socket_desc
sandbox_request->arguments = arguments; sandbox_request->arguments = arguments;
sandbox_request->socket_descriptor = socket_descriptor; sandbox_request->socket_descriptor = socket_descriptor;
sandbox_request->socket_address = (struct sockaddr *)socket_address; sandbox_request->socket_address = (struct sockaddr *)socket_address;
sandbox_request->start_time = start_time; sandbox_request->request_timestamp = request_timestamp;
sandbox_request->absolute_deadline = start_time + module->relative_deadline_us * runtime_processor_speed_MHz; sandbox_request->absolute_deadline = request_timestamp
+ module->relative_deadline_us * runtime_processor_speed_MHz;
debuglog("[%p: %s]\n", sandbox_request, sandbox_request->module->name); debuglog("[%p: %s]\n", sandbox_request, sandbox_request->module->name);
return sandbox_request; return sandbox_request;

@ -55,13 +55,13 @@ local_runqueue_minheap_delete(struct sandbox *sandbox)
int rc = priority_queue_delete(&local_runqueue_minheap, sandbox); int rc = priority_queue_delete(&local_runqueue_minheap, sandbox);
if (rc == -1) { if (rc == -1) {
panic("Err: Thread Local %lu tried to delete sandbox %lu from runqueue, but was not present\n", panic("Err: Thread Local %lu tried to delete sandbox %lu from runqueue, but was not present\n",
pthread_self(), sandbox->start_time); pthread_self(), sandbox->request_timestamp);
} }
} }
/** /**
* This function determines the next sandbox to run. This is either the head of the runqueue or the head of the request * This function determines the next sandbox to run.
*queue * This is either the head of the runqueue or the head of the request queue
* *
* Execute the sandbox at the head of the thread local runqueue * Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
@ -100,14 +100,19 @@ local_runqueue_minheap_get_next()
/** /**
* Conditionally checks to see if current sandbox should be preempted * Called by the SIGALRM handler after a quantum
* Assumes the caller validates that there is something to preempt
* @param user_context - The context of our user-level Worker thread
*/ */
void void
local_runqueue_minheap_preempt(ucontext_t *user_context) local_runqueue_minheap_preempt(ucontext_t *user_context)
{ {
assert(user_context != NULL);
software_interrupt_disable(); /* no nesting! */ software_interrupt_disable(); /* no nesting! */
struct sandbox *current_sandbox = current_sandbox_get(); struct sandbox *current_sandbox = current_sandbox_get();
/* If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. */ /* If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course. */
if (current_sandbox == NULL) { if (current_sandbox == NULL) {
software_interrupt_enable(); software_interrupt_enable();
@ -117,10 +122,6 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* The current sandbox should be the head of the runqueue */ /* The current sandbox should be the head of the runqueue */
assert(local_runqueue_minheap_is_empty() == false); assert(local_runqueue_minheap_is_empty() == false);
// TODO: Factor quantum and/or sandbox allocation time into decision
// uint64_t global_deadline = global_request_scheduler_peek() -
// software_interrupt_interval_duration_in_cycles;
bool should_enable_software_interrupt = true; bool should_enable_software_interrupt = true;
uint64_t local_deadline = priority_queue_peek(&local_runqueue_minheap); uint64_t local_deadline = priority_queue_peek(&local_runqueue_minheap);
uint64_t global_deadline = global_request_scheduler_peek(); uint64_t global_deadline = global_request_scheduler_peek();
@ -128,9 +129,17 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* Our local deadline should only be ULONG_MAX if our local runqueue is empty */ /* Our local deadline should only be ULONG_MAX if our local runqueue is empty */
if (local_deadline == ULONG_MAX) { assert(local_runqueue_minheap.first_free == 1); }; if (local_deadline == ULONG_MAX) { assert(local_runqueue_minheap.first_free == 1); };
/* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */ /*
struct sandbox_request *sandbox_request; * If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it
*
* TODO: Factor quantum and/or sandbox allocation time into decision
* Something like global_request_scheduler_peek() - software_interrupt_interval_duration_in_cycles;
*/
if (global_deadline < local_deadline) { if (global_deadline < local_deadline) {
debuglog("Thread %lu | Sandbox %lu | Had deadline of %lu. Trying to preempt for request with %lu\n",
pthread_self(), current_sandbox->allocation_timestamp, local_deadline, global_deadline);
struct sandbox_request *sandbox_request; struct sandbox_request *sandbox_request;
int return_code = global_request_scheduler_remove(&sandbox_request); int return_code = global_request_scheduler_remove(&sandbox_request);
@ -155,9 +164,11 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* Update current_sandbox to the next sandbox */ /* Update current_sandbox to the next sandbox */
current_sandbox_set(next_sandbox); current_sandbox_set(next_sandbox);
/* And load the context of this new sandbox /*
RC of 1 indicates that sandbox was last in a user-level context switch state, * And load the context of this new sandbox
so do not enable software interrupts. */ * RC of 1 indicates that sandbox was last in a user-level context switch state,
* so do not enable software interrupts.
*/
if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1) if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1)
should_enable_software_interrupt = false; should_enable_software_interrupt = false;
} }

@ -74,7 +74,7 @@ listener_thread_main(void *dummy)
LISTENER_THREAD_MAX_EPOLL_EVENTS, -1); LISTENER_THREAD_MAX_EPOLL_EVENTS, -1);
/* Capture Start Time to calculate absolute deadline */ /* Capture Start Time to calculate absolute deadline */
uint64_t start_time = __getcycles(); uint64_t request_timestamp = __getcycles();
for (int i = 0; i < request_count; i++) { for (int i = 0; i < request_count; i++) {
if (epoll_events[i].events & EPOLLERR) { if (epoll_events[i].events & EPOLLERR) {
perror("epoll_wait"); perror("epoll_wait");
@ -97,7 +97,7 @@ listener_thread_main(void *dummy)
/* Allocate a Sandbox Request */ /* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request = struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, module->name, socket_descriptor, sandbox_request_allocate(module, module->name, socket_descriptor,
(const struct sockaddr *)&client_address, start_time); (const struct sockaddr *)&client_address, request_timestamp);
assert(sandbox_request); assert(sandbox_request);
/* Add to the Global Sandbox Request Scheduler */ /* Add to the Global Sandbox Request Scheduler */

@ -79,7 +79,8 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
r = recv(sandbox->client_socket_descriptor, (sandbox->request_response_data), sandbox->module->max_request_size, r = recv(sandbox->client_socket_descriptor, (sandbox->request_response_data), sandbox->module->max_request_size,
0); 0);
if (r <= 0) { if (r <= 0) {
if (r < 0) perror("recv1"); if (r < 0) perror("Error reading request data from client socket");
if (r == 0) perror("No data to reach from client socket");
return r; return r;
} }
while (r > 0) { while (r > 0) {
@ -101,7 +102,10 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
libuv_callbacks_on_allocate_setup_request_response_data, libuv_callbacks_on_allocate_setup_request_response_data,
libuv_callbacks_on_read_parse_http_request); libuv_callbacks_on_read_parse_http_request);
worker_thread_process_io(); worker_thread_process_io();
if (sandbox->request_response_data_length == 0) return 0; if (sandbox->request_response_data_length == 0) {
perror("request_response_data_length was unexpectedly 0");
return 0
};
#endif #endif
return 1; return 1;
} }
@ -147,7 +151,7 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox)
done: done:
assert(sndsz == sandbox->request_response_data_length); assert(sndsz == sandbox->request_response_data_length);
uint64_t end_time = __getcycles(); uint64_t end_time = __getcycles();
sandbox->total_time = end_time - sandbox->start_time; sandbox->total_time = end_time - sandbox->request_timestamp;
uint64_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz; uint64_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz;
debuglog("%s():%d, %u, %lu\n", sandbox->module->name, sandbox->module->port, debuglog("%s():%d, %u, %lu\n", sandbox->module->name, sandbox->module->port,
@ -244,6 +248,8 @@ current_sandbox_main(void)
assert(sandbox != NULL); assert(sandbox != NULL);
assert(sandbox->state == SANDBOX_RUNNABLE); assert(sandbox->state == SANDBOX_RUNNABLE);
char *error_message = "";
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
arch_context_init(&sandbox->ctxt, 0, 0); arch_context_init(&sandbox->ctxt, 0, 0);
worker_thread_next_context = NULL; worker_thread_next_context = NULL;
@ -255,7 +261,10 @@ current_sandbox_main(void)
/* Parse the request. 1 = Success */ /* Parse the request. 1 = Success */
int rc = sandbox_receive_and_parse_client_request(sandbox); int rc = sandbox_receive_and_parse_client_request(sandbox);
if (rc != 1) goto done; if (rc != 1) {
error_message = "Unable to receive and parse client request";
goto err;
};
/* Initialize the module */ /* Initialize the module */
struct module *current_module = sandbox_get_module(sandbox); struct module *current_module = sandbox_get_module(sandbox);
@ -271,7 +280,11 @@ current_sandbox_main(void)
sandbox->return_value = module_main(current_module, argument_count, sandbox->arguments_offset); sandbox->return_value = module_main(current_module, argument_count, sandbox->arguments_offset);
/* Retrieve the result, construct the HTTP response, and send to client */ /* Retrieve the result, construct the HTTP response, and send to client */
sandbox_build_and_send_client_response(sandbox); rc = sandbox_build_and_send_client_response(sandbox);
if (rc == -1) {
error_message = "Unable to build and send client response";
goto err;
};
done: done:
/* Cleanup connection and exit sandbox */ /* Cleanup connection and exit sandbox */
@ -282,6 +295,9 @@ done:
* https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66 * https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66
*/ */
assert(0); assert(0);
err:
perror(error_message);
goto done;
} }
/** /**
@ -371,27 +387,32 @@ sandbox_allocate(struct sandbox_request *sandbox_request)
assert(sandbox_request->module != NULL); assert(sandbox_request->module != NULL);
assert(module_is_valid(sandbox_request->module)); assert(module_is_valid(sandbox_request->module));
char * error_message = NULL; char * error_message = "";
int rc; int rc;
struct sandbox *sandbox = NULL; struct sandbox *sandbox = NULL;
/* Allocate Sandbox control structures, buffers, and linear memory in a 4GB address space */ /* Allocate Sandbox control structures, buffers, and linear memory in a 4GB address space */
errno = 0;
sandbox = (struct sandbox *)sandbox_allocate_memory(sandbox_request->module); sandbox = (struct sandbox *)sandbox_allocate_memory(sandbox_request->module);
if (!sandbox) goto err_memory_allocation_failed; if (!sandbox) {
error_message = "failed to allocate sandbox heap and linear memory";
goto err_memory_allocation_failed;
}
/* Set state to initializing */ /* Set state to initializing */
sandbox->state = SANDBOX_INITIALIZING; sandbox->state = SANDBOX_INITIALIZING;
/* Allocate the Stack */ /* Allocate the Stack */
rc = sandbox_allocate_stack(sandbox); rc = sandbox_allocate_stack(sandbox);
if (rc != 0) goto err_stack_allocation_failed; if (rc != 0) {
error_message = "failed to allocate sandbox heap and linear memory";
goto err_stack_allocation_failed;
}
/* Copy the socket descriptor, address, and arguments of the client invocation */ /* Copy the socket descriptor, address, and arguments of the client invocation */
sandbox->absolute_deadline = sandbox_request->absolute_deadline; sandbox->absolute_deadline = sandbox_request->absolute_deadline;
sandbox->arguments = (void *)sandbox_request->arguments; sandbox->arguments = (void *)sandbox_request->arguments;
sandbox->client_socket_descriptor = sandbox_request->socket_descriptor; sandbox->client_socket_descriptor = sandbox_request->socket_descriptor;
sandbox->start_time = sandbox_request->start_time; sandbox->request_timestamp = sandbox_request->request_timestamp;
/* Initialize the sandbox's context, stack, and instruction pointer */ /* Initialize the sandbox's context, stack, and instruction pointer */
arch_context_init(&sandbox->ctxt, (reg_t)current_sandbox_main, arch_context_init(&sandbox->ctxt, (reg_t)current_sandbox_main,
@ -415,6 +436,7 @@ err_stack_allocation_failed:
err_memory_allocation_failed: err_memory_allocation_failed:
err: err:
perror(error_message); perror(error_message);
sandbox = NULL;
goto done; goto done;
} }

@ -11,6 +11,7 @@
#include "current_sandbox.h" #include "current_sandbox.h"
#include "local_runqueue.h" #include "local_runqueue.h"
#include "module.h" #include "module.h"
#include "panic.h"
#include "runtime.h" #include "runtime.h"
#include "sandbox.h" #include "sandbox.h"
#include "software_interrupt.h" #include "software_interrupt.h"
@ -64,53 +65,63 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
case SIGALRM: { case SIGALRM: {
/* SIGALRM is the preemption signal that occurs every quantum of execution */ /* SIGALRM is the preemption signal that occurs every quantum of execution */
/* A POSIX signal is delivered to one of the threads in our process.If sent by the kernel, "broadcast" /*
* by forwarding to all all threads */ * A POSIX signal is delivered to only one thread.
* We need to ensure this thread broadcasts to all other threads
*/
if (signal_info->si_code == SI_KERNEL) { if (signal_info->si_code == SI_KERNEL) {
/* Signal was sent directly by the kernel, so forward to other threads */
for (int i = 0; i < runtime_total_worker_processors; i++) { for (int i = 0; i < runtime_total_worker_processors; i++) {
if (pthread_self() != runtime_worker_threads[i]) { if (pthread_self() != runtime_worker_threads[i]) {
pthread_kill(runtime_worker_threads[i], SIGALRM); pthread_kill(runtime_worker_threads[i], SIGALRM);
} }
} }
} else { } else {
/* If not sent by the kernel, this should be a signal forwarded from another thread */ /* Signal forwarded from another thread. Just confirm it resulted from pthread_kill */
assert(signal_info->si_code == SI_TKILL); assert(signal_info->si_code == SI_TKILL);
} }
// debuglog("alrm:%d\n", software_interrupt_SIGALRM_count);
debuglog("alrm:%d\n", software_interrupt_SIGALRM_count);
software_interrupt_SIGALRM_count++; software_interrupt_SIGALRM_count++;
/* if the current sandbox is NULL or not in a returned state */ /* NOOP if software interrupts not enabled */
if (current_sandbox && current_sandbox->state == SANDBOX_RETURNED) return;
/* and the next context is NULL */
if (worker_thread_next_context) return;
/* and software interrupts are not disabled */
if (!software_interrupt_is_enabled()) return; if (!software_interrupt_is_enabled()) return;
/* Do not allow more than one layer of preemption */
if (worker_thread_next_context) return;
/* if the current sandbox is NULL or in a RETURNED state, nothing to preempt, so just return */
if (!current_sandbox) return;
if (current_sandbox->state == SANDBOX_RETURNED) return;
/* Preempt */ /* Preempt */
local_runqueue_preempt(user_context); local_runqueue_preempt(user_context);
return; return;
} }
case SIGUSR1: { /* SIGUSR1 restores the preempted sandbox stored in worker_thread_next_context. */ case SIGUSR1: {
/* Make sure *sigalrm doesn't mess this up if nested.. */ /* SIGUSR1 restores a preempted sandbox using mcontext. */
/* Assumption: Always sent by a thread, never by the kernel */
assert(signal_info->si_code == SI_TKILL);
/* Assumption: Caller disables interrupt before triggering SIGUSR1 */
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
/* we set current before calling pthread_kill! */
/* Assumption: Caller sets current_sandbox to the preempted sandbox */
assert(worker_thread_next_context && (&current_sandbox->ctxt == worker_thread_next_context)); assert(worker_thread_next_context && (&current_sandbox->ctxt == worker_thread_next_context));
assert(signal_info->si_code == SI_TKILL);
// debuglog("usr1:%d\n", software_interrupt_SIGUSR_count);
software_interrupt_SIGUSR_count++; software_interrupt_SIGUSR_count++;
/* do not save current sandbox.. it is in co-operative switch.. debuglog("usr1:%d\n", software_interrupt_SIGUSR_count);
pick the next from "worker_thread_next_context"..
assert its "sp" to be zero in regs..
memcpy from next context.. */
arch_mcontext_restore(&user_context->uc_mcontext, &current_sandbox->ctxt); arch_mcontext_restore(&user_context->uc_mcontext, &current_sandbox->ctxt);
worker_thread_next_context = NULL; worker_thread_next_context = NULL;
software_interrupt_enable(); software_interrupt_enable();
break;
return;
} }
default: default:
break; panic("Handler unexpectedly called for signal other than SIGALRM or SIGUSR1\n");
} }
#endif #endif
} }

@ -30,7 +30,7 @@ __thread struct arch_context worker_thread_base_context;
__thread uv_loop_t worker_thread_uvio_handle; __thread uv_loop_t worker_thread_uvio_handle;
/* Flag to signify if the thread is currently running callbacks in the libuv event loop */ /* Flag to signify if the thread is currently running callbacks in the libuv event loop */
static __thread bool worker_thread_is_in_callback; static __thread bool worker_thread_is_in_libuv_event_loop = false;
/*********************** /***********************
* Worker Thread Logic * * Worker Thread Logic *
@ -84,8 +84,10 @@ worker_thread_switch_to_sandbox(struct sandbox *next_sandbox)
void void
worker_thread_wakeup_sandbox(struct sandbox *sandbox) worker_thread_wakeup_sandbox(struct sandbox *sandbox)
{ {
assert(sandbox != NULL);
assert(sandbox->state == SANDBOX_BLOCKED);
software_interrupt_disable(); software_interrupt_disable();
if (sandbox->state != SANDBOX_BLOCKED) goto done;
sandbox->state = SANDBOX_RUNNABLE; sandbox->state = SANDBOX_RUNNABLE;
debuglog("Marking blocked sandbox as runnable\n"); debuglog("Marking blocked sandbox as runnable\n");
@ -103,7 +105,7 @@ done:
void void
worker_thread_block_current_sandbox(void) worker_thread_block_current_sandbox(void)
{ {
assert(worker_thread_is_in_callback == false); assert(worker_thread_is_in_libuv_event_loop == false);
software_interrupt_disable(); software_interrupt_disable();
/* Remove the sandbox we were just executing from the runqueue and mark as blocked */ /* Remove the sandbox we were just executing from the runqueue and mark as blocked */
@ -145,13 +147,12 @@ worker_thread_process_io(void)
* sigreturn. To get to sigreturn, we need to send ourselves a signal, then update the registers we should return to, * sigreturn. To get to sigreturn, we need to send ourselves a signal, then update the registers we should return to,
* then sigreturn (by returning from the handler). * then sigreturn (by returning from the handler).
*/ */
void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void) void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_mcontext_restore(void)
{ {
debuglog("Thread %lu | Signaling SIGUSR1 on self to initiate mcontext restore...\n", pthread_self());
pthread_kill(pthread_self(), SIGUSR1); pthread_kill(pthread_self(), SIGUSR1);
assert(false); /* should not get here.. */ assert(false); /* should not get here.. */
while (true)
;
} }
/** /**
@ -160,13 +161,13 @@ void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_s
void void
worker_thread_execute_libuv_event_loop(void) worker_thread_execute_libuv_event_loop(void)
{ {
worker_thread_is_in_callback = true; worker_thread_is_in_libuv_event_loop = true;
int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0; int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0;
while (n > 0) { while (n > 0) {
n--; n--;
uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT); uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT);
} }
worker_thread_is_in_callback = false; worker_thread_is_in_libuv_event_loop = false;
} }
/** /**
@ -177,27 +178,40 @@ worker_thread_execute_libuv_event_loop(void)
void * void *
worker_thread_main(void *return_code) worker_thread_main(void *return_code)
{ {
/* Initialize Worker Infrastructure */ /* Initialize Base Context */
arch_context_init(&worker_thread_base_context, 0, 0); arch_context_init(&worker_thread_base_context, 0, 0);
/* Initialize Runqueue Variant */
// local_runqueue_list_initialize(); // local_runqueue_list_initialize();
local_runqueue_minheap_initialize(); local_runqueue_minheap_initialize();
/* Initialize Completion Queue */
local_completion_queue_initialize(); local_completion_queue_initialize();
/* Initialize Flags */
software_interrupt_is_disabled = false; software_interrupt_is_disabled = false;
worker_thread_is_in_libuv_event_loop = false;
worker_thread_next_context = NULL; worker_thread_next_context = NULL;
/* Unmask signals */
#ifndef PREEMPT_DISABLE #ifndef PREEMPT_DISABLE
software_interrupt_unmask_signal(SIGALRM); software_interrupt_unmask_signal(SIGALRM);
software_interrupt_unmask_signal(SIGUSR1); software_interrupt_unmask_signal(SIGUSR1);
#endif #endif
/* Initialize libuv event loop handle */
uv_loop_init(&worker_thread_uvio_handle); uv_loop_init(&worker_thread_uvio_handle);
worker_thread_is_in_callback = false;
/* Begin Worker Execution Loop */ /* Begin Worker Execution Loop */
struct sandbox *next_sandbox; struct sandbox *next_sandbox;
while (true) { while (true) {
/* Assumption: current_sandbox should be unset at start of loop */
assert(current_sandbox_get() == NULL); assert(current_sandbox_get() == NULL);
/* If "in a callback", the libuv event loop is triggering this, so we don't need to start it */
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
/* Execute libuv event loop */
if (!worker_thread_is_in_libuv_event_loop) worker_thread_execute_libuv_event_loop();
/* Switch to a sandbox if one is ready to run */
software_interrupt_disable(); software_interrupt_disable();
next_sandbox = local_runqueue_get_next(); next_sandbox = local_runqueue_get_next();
if (next_sandbox != NULL) { if (next_sandbox != NULL) {
@ -206,6 +220,7 @@ worker_thread_main(void *return_code)
software_interrupt_enable(); software_interrupt_enable();
}; };
/* Clear the completion queue */
local_completion_queue_free(); local_completion_queue_free();
} }
@ -215,10 +230,10 @@ worker_thread_main(void *return_code)
/** /**
* Called when the function in the sandbox exits * Called when the function in the sandbox exits
* Removes the standbox from the thread-local runqueue, sets its state to SANDBOX_RETURNED, * Removes the standbox from the thread-local runqueue, sets its state to SANDBOX_RETURNED,
* releases the linear memory, and then switches to the sandbox at the head of the runqueue * releases the linear memory, and then returns to the base context
* TODO: Consider moving this to a future current_sandbox file. This has thus far proven difficult to move * TODO: Consider moving this to a future current_sandbox file. This has thus far proven difficult to move
*/ */
void __attribute__((noreturn)) void
worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox) worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox)
{ {
assert(exiting_sandbox); assert(exiting_sandbox);
@ -237,4 +252,6 @@ worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox)
/* This should force return to main event loop */ /* This should force return to main event loop */
worker_thread_switch_to_sandbox(NULL); worker_thread_switch_to_sandbox(NULL);
assert(0);
} }

Loading…
Cancel
Save