chore: break up runtime and worker_thread

main
Sean McBride 5 years ago
parent 4d29585236
commit 83ee42e89b

@ -1,6 +1,8 @@
#ifndef SRFT_HTTP_PARSER_SETTINGS_H #ifndef SRFT_HTTP_PARSER_SETTINGS_H
#define SRFT_HTTP_PARSER_SETTINGS_H #define SRFT_HTTP_PARSER_SETTINGS_H
#include "http_parser.h"
void http_parser_settings_initialize(void); void http_parser_settings_initialize(void);
http_parser_settings *http_parser_settings_get(); http_parser_settings *http_parser_settings_get();

@ -2,14 +2,9 @@
#define SFRT_RUNTIME_H #define SFRT_RUNTIME_H
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event #include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
#include <uv.h>
#include "module.h"
#include "sandbox.h"
#include "types.h" #include "types.h"
extern int runtime_epoll_file_descriptor; extern int runtime_epoll_file_descriptor;
extern __thread uv_loop_t worker_thread_uvio_handle;
void alloc_linear_memory(void); void alloc_linear_memory(void);
void expand_memory(void); void expand_memory(void);
@ -19,55 +14,6 @@ INLINE char *get_memory_ptr_for_runtime(u32 offset, u32 bounds_check);
void runtime_initialize(void); void runtime_initialize(void);
void listener_thread_initialize(void); void listener_thread_initialize(void);
void stub_init(i32 offset); void stub_init(i32 offset);
void * worker_thread_main(void *return_code);
/**
* Translates WASM offsets into runtime VM pointers
* @param offset an offset into the WebAssembly linear memory
* @param bounds_check the size of the thing we are pointing to
* @return void pointer to something in WebAssembly linear memory
**/
static inline void *
worker_thread_get_memory_ptr_void(u32 offset, u32 bounds_check)
{
return (void *)get_memory_ptr_for_runtime(offset, bounds_check);
}
/**
* Get a single-byte extended ASCII character from WebAssembly linear memory
* @param offset an offset into the WebAssembly linear memory
* @return char at the offset
**/
static inline char
worker_thread_get_memory_character(u32 offset)
{
return get_memory_ptr_for_runtime(offset, 1)[0];
}
/**
* Get a null-terminated String from WebAssembly linear memory
* @param offset an offset into the WebAssembly linear memory
* @param max_length the maximum expected length in characters
* @return pointer to the string or NULL if max_length is reached without finding null-terminator
**/
static inline char *
worker_thread_get_memory_string(u32 offset, u32 max_length)
{
for (int i = 0; i < max_length; i++) {
if (worker_thread_get_memory_character(offset + i) == '\0')
return worker_thread_get_memory_ptr_void(offset, 1);
}
return NULL;
}
/**
* Get global libuv handle
**/
static inline uv_loop_t *
worker_thread_get_libuv_handle(void)
{
return &worker_thread_uvio_handle;
}
unsigned long long __getcycles(void); unsigned long long __getcycles(void);

@ -1,9 +1,12 @@
#ifndef SFRT_SANDBOX_REQUEST_H #ifndef SFRT_SANDBOX_REQUEST_H
#define SFRT_SANDBOX_REQUEST_H #define SFRT_SANDBOX_REQUEST_H
#include <stdbool.h>
#include "deque.h" #include "deque.h"
#include "types.h" #include "module.h"
#include "runtime.h" #include "runtime.h"
#include "types.h"
extern float runtime_processor_speed_MHz; extern float runtime_processor_speed_MHz;

@ -0,0 +1,59 @@
#ifndef SFRT_WORKER_THREAD_H
#define SFRT_WORKER_THREAD_H
#include <uv.h>
#include "types.h"
extern __thread uv_loop_t worker_thread_uvio_handle;
void *worker_thread_main(void *return_code);
/**
* Translates WASM offsets into runtime VM pointers
* @param offset an offset into the WebAssembly linear memory
* @param bounds_check the size of the thing we are pointing to
* @return void pointer to something in WebAssembly linear memory
**/
static inline void *
worker_thread_get_memory_ptr_void(u32 offset, u32 bounds_check)
{
return (void *)get_memory_ptr_for_runtime(offset, bounds_check);
}
/**
* Get a single-byte extended ASCII character from WebAssembly linear memory
* @param offset an offset into the WebAssembly linear memory
* @return char at the offset
**/
static inline char
worker_thread_get_memory_character(u32 offset)
{
return get_memory_ptr_for_runtime(offset, 1)[0];
}
/**
* Get a null-terminated String from WebAssembly linear memory
* @param offset an offset into the WebAssembly linear memory
* @param max_length the maximum expected length in characters
* @return pointer to the string or NULL if max_length is reached without finding null-terminator
**/
static inline char *
worker_thread_get_memory_string(u32 offset, u32 max_length)
{
for (int i = 0; i < max_length; i++) {
if (worker_thread_get_memory_character(offset + i) == '\0')
return worker_thread_get_memory_ptr_void(offset, 1);
}
return NULL;
}
/**
* Get global libuv handle
**/
static inline uv_loop_t *
worker_thread_get_libuv_handle(void)
{
return &worker_thread_uvio_handle;
}
#endif /* SFRT_WORKER_THREAD_H */

@ -1,5 +1,6 @@
/* https://github.com/gwsystems/silverfish/blob/master/runtime/libc/libc_backing.c */ /* https://github.com/gwsystems/silverfish/blob/master/runtime/libc/libc_backing.c */
#include <runtime.h> #include <runtime.h>
#include <worker_thread.h>
#include <ck_pr.h> #include <ck_pr.h>
extern i32 inner_syscall_handler(i32 n, i32 a, i32 b, i32 c, i32 d, i32 e, i32 f); extern i32 inner_syscall_handler(i32 n, i32 a, i32 b, i32 c, i32 d, i32 e, i32 f);

@ -1,4 +1,5 @@
#include <runtime.h> #include <runtime.h>
#include <worker_thread.h>
#include <sandbox.h> #include <sandbox.h>
#include <uv.h> #include <uv.h>
#include <http_request.h> #include <http_request.h>
@ -522,9 +523,9 @@ wasm_readv(i32 file_descriptor, i32 iov_offset, i32 iovcnt)
struct wasm_iovec *iov = worker_thread_get_memory_ptr_void(iov_offset, iovcnt * sizeof(struct wasm_iovec)); struct wasm_iovec *iov = worker_thread_get_memory_ptr_void(iov_offset, iovcnt * sizeof(struct wasm_iovec));
for (int i = 0; i < iovcnt; i += RUNTIME_READ_WRITE_VECTOR_LENGTH) { for (int i = 0; i < iovcnt; i += RUNTIME_READ_WRITE_VECTOR_LENGTH) {
uv_fs_t req = UV_FS_REQ_INIT(); uv_fs_t req = UV_FS_REQ_INIT();
uv_buf_t bufs[RUNTIME_READ_WRITE_VECTOR_LENGTH] = { 0 }; // avoid mallocs here! uv_buf_t bufs[RUNTIME_READ_WRITE_VECTOR_LENGTH] = { 0 }; // avoid mallocs here!
int j = 0; int j = 0;
for (j = 0; j < RUNTIME_READ_WRITE_VECTOR_LENGTH && i + j < iovcnt; j++) { for (j = 0; j < RUNTIME_READ_WRITE_VECTOR_LENGTH && i + j < iovcnt; j++) {
bufs[j] = uv_buf_init(worker_thread_get_memory_ptr_void(iov[i + j].base_offset, iov[i + j].len), bufs[j] = uv_buf_init(worker_thread_get_memory_ptr_void(iov[i + j].base_offset, iov[i + j].len),
@ -570,9 +571,9 @@ wasm_writev(i32 file_descriptor, i32 iov_offset, i32 iovcnt)
struct wasm_iovec *iov = worker_thread_get_memory_ptr_void(iov_offset, iovcnt * sizeof(struct wasm_iovec)); struct wasm_iovec *iov = worker_thread_get_memory_ptr_void(iov_offset, iovcnt * sizeof(struct wasm_iovec));
for (int i = 0; i < iovcnt; i += RUNTIME_READ_WRITE_VECTOR_LENGTH) { for (int i = 0; i < iovcnt; i += RUNTIME_READ_WRITE_VECTOR_LENGTH) {
uv_fs_t req = UV_FS_REQ_INIT(); uv_fs_t req = UV_FS_REQ_INIT();
uv_buf_t bufs[RUNTIME_READ_WRITE_VECTOR_LENGTH] = { 0 }; // avoid mallocs here! uv_buf_t bufs[RUNTIME_READ_WRITE_VECTOR_LENGTH] = { 0 }; // avoid mallocs here!
int j = 0; int j = 0;
for (j = 0; j < RUNTIME_READ_WRITE_VECTOR_LENGTH && i + j < iovcnt; j++) { for (j = 0; j < RUNTIME_READ_WRITE_VECTOR_LENGTH && i + j < iovcnt; j++) {
bufs[j] = uv_buf_init(worker_thread_get_memory_ptr_void(iov[i + j].base_offset, iov[i + j].len), bufs[j] = uv_buf_init(worker_thread_get_memory_ptr_void(iov[i + j].base_offset, iov[i + j].len),

@ -12,6 +12,7 @@
#include <runtime.h> #include <runtime.h>
#include <sandbox.h> #include <sandbox.h>
#include <software_interrupt.h> #include <software_interrupt.h>
#include <worker_thread.h>
// Conditionally used by debuglog when DEBUG is set // Conditionally used by debuglog when DEBUG is set
#ifdef DEBUG #ifdef DEBUG

@ -15,17 +15,11 @@
* Local Includes * * Local Includes *
**************************/ **************************/
#include <arch/context.h> #include <arch/context.h>
#include <current_sandbox.h>
#include <http_parser_settings.h> #include <http_parser_settings.h>
#include <module.h> #include <module.h>
#include <sandbox.h>
#include <sandbox_completion_queue.h>
#include <sandbox_request.h> #include <sandbox_request.h>
// #include <sandbox_request_scheduler_fifo.h> // #include <sandbox_request_scheduler_fifo.h>
#include <sandbox_request_scheduler_ps.h> #include <sandbox_request_scheduler_ps.h>
#include <sandbox_run_queue.h>
// #include <sandbox_run_queue_fifo.h>
#include <sandbox_run_queue_ps.h>
#include <software_interrupt.h> #include <software_interrupt.h>
#include <types.h> #include <types.h>
@ -144,262 +138,3 @@ listener_thread_initialize(void)
software_interrupt_initialize(); software_interrupt_initialize();
software_interrupt_arm_timer(); software_interrupt_arm_timer();
} }
/***************************
* Worker Thread State *
**************************/
// context pointer to switch to when this thread gets a SIGUSR1
__thread arch_context_t *worker_thread_next_context = NULL;
// context of the runtime thread before running sandboxes or to resume its "main".
__thread arch_context_t worker_thread_base_context;
// libuv i/o loop handle per sandboxing thread!
__thread uv_loop_t worker_thread_uvio_handle;
// Flag to signify if the thread is currently running callbacks in the libuv event loop
static __thread bool worker_thread_is_in_callback;
/**************************************************
* Worker Thread Logic
*************************************************/
/**
* @brief Switches to the next sandbox, placing the current sandbox of the completion queue if in RETURNED state
* @param next The Sandbox Context to switch to or NULL
* @return void
*/
static inline void
worker_thread_switch_to_sandbox(struct sandbox *next_sandbox)
{
arch_context_t *next_register_context = next_sandbox == NULL ? NULL : &next_sandbox->ctxt;
software_interrupt_disable();
// Get the old sandbox we're switching from
struct sandbox *previous_sandbox = current_sandbox_get();
arch_context_t *previous_register_context = previous_sandbox == NULL ? NULL : &previous_sandbox->ctxt;
// Set the current sandbox to the next
current_sandbox_set(next_sandbox);
// and switch to the associated context. But what is the purpose of worker_thread_next_context?
worker_thread_next_context = next_register_context;
arch_context_switch(previous_register_context, next_register_context);
// If the current sandbox we're switching from is in a RETURNED state, add to completion queue
if (previous_sandbox != NULL && previous_sandbox->state == RETURNED)
sandbox_completion_queue_add(previous_sandbox);
software_interrupt_enable();
}
/**
* Mark a blocked sandbox as runnable and add it to the runqueue
* @param sandbox the sandbox to check and update if blocked
**/
void
worker_thread_wakeup_sandbox(sandbox_t *sandbox)
{
software_interrupt_disable();
// debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
if (sandbox->state == BLOCKED) {
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
}
software_interrupt_enable();
}
/**
* Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head
*of the runqueue
**/
void
worker_thread_block_current_sandbox(void)
{
assert(worker_thread_is_in_callback == false);
software_interrupt_disable();
// Remove the sandbox we were just executing from the runqueue and mark as blocked
struct sandbox *previous_sandbox = current_sandbox_get();
sandbox_run_queue_remove(previous_sandbox);
previous_sandbox->state = BLOCKED;
// Switch to the next sandbox
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", previous_sandbox, previous_sandbox->module->name,
next_sandbox, next_sandbox ? next_sandbox->module->name : "");
software_interrupt_enable();
worker_thread_switch_to_sandbox(next_sandbox);
}
/**
* Execute I/O
**/
void
worker_thread_process_io(void)
{
#ifdef USE_HTTP_UVIO
#ifdef USE_HTTP_SYNC
// realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, not
// great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do
// async block!
uv_run(worker_thread_get_libuv_handle(), UV_RUN_DEFAULT);
#else /* USE_HTTP_SYNC */
worker_thread_block_current_sandbox();
#endif /* USE_HTTP_UVIO */
#else
assert(false);
// it should not be called if not using uvio for http
#endif
}
/**
* TODO: What is this doing?
**/
void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void)
{
pthread_kill(pthread_self(), SIGUSR1);
assert(false); // should not get here..
while (true)
;
}
/**
* Pulls up to 1..n sandbox requests, allocates them as sandboxes, sets them as runnable and places them on the local
* runqueue, and then frees the sandbox requests The batch size pulled at once is set by SANDBOX_PULL_BATCH_SIZE
* @return the number of sandbox requests pulled
*/
static inline int
worker_thread_pull_and_process_sandbox_requests(void)
{
int total_sandboxes_pulled = 0;
while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) {
sandbox_request_t *sandbox_request;
if ((sandbox_request = sandbox_request_scheduler_remove()) == NULL) break;
// Actually allocate the sandbox for the requests that we've pulled
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
// Set the sandbox as runnable and place on the local runqueue
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
total_sandboxes_pulled++;
}
return total_sandboxes_pulled;
}
/**
* Run all outstanding events in the local thread's libuv event loop
**/
void
worker_thread_execute_libuv_event_loop(void)
{
worker_thread_is_in_callback = true;
int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0;
while (n > 0) {
n--;
uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT);
}
worker_thread_is_in_callback = false;
}
/**
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @return the sandbox to execute or NULL if none are available
**/
struct sandbox *
worker_thread_get_next_sandbox()
{
if (sandbox_run_queue_is_empty()) {
int sandboxes_pulled = worker_thread_pull_and_process_sandbox_requests();
if (sandboxes_pulled == 0) return NULL;
}
// Execute Round Robin Scheduling Logic
struct sandbox *next_sandbox = sandbox_run_queue_remove();
assert(next_sandbox->state != RETURNED);
sandbox_run_queue_add(next_sandbox);
debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name);
return next_sandbox;
}
/**
* The entry function for sandbox worker threads
* Initializes thread-local state, unmasks signals, sets up libuv loop and
* @param return_code - argument provided by pthread API. We set to -1 on error
**/
void *
worker_thread_main(void *return_code)
{
// Initialize Worker State
arch_context_init(&worker_thread_base_context, 0, 0);
// sandbox_run_queue_fifo_initialize();
sandbox_run_queue_ps_initialize();
sandbox_completion_queue_initialize();
software_interrupt_is_disabled = false;
worker_thread_next_context = NULL;
#ifndef PREEMPT_DISABLE
software_interrupt_unmask_signal(SIGALRM);
software_interrupt_unmask_signal(SIGUSR1);
#endif
uv_loop_init(&worker_thread_uvio_handle);
worker_thread_is_in_callback = false;
// Begin Worker Execution Loop
struct sandbox *next_sandbox;
while (true) {
assert(current_sandbox_get() == NULL);
// If "in a callback", the libuv event loop is triggering this, so we don't need to start it
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
software_interrupt_disable();
next_sandbox = worker_thread_get_next_sandbox();
software_interrupt_enable();
if (next_sandbox != NULL) {
worker_thread_switch_to_sandbox(next_sandbox);
sandbox_completion_queue_free(1);
}
}
*(int *)return_code = -1;
pthread_exit(return_code);
}
/**
* Called when the function in the sandbox exits
* Removes the standbox from the thread-local runqueue, sets its state to RETURNED,
* releases the linear memory, and then switches to the sandbox at the head of the runqueue
* TODO: Consider moving this to a future current_sandbox file. This has thus far proven difficult to move
**/
void
worker_thread_exit_current_sandbox(void)
{
// Remove the sandbox that exited from the runqueue and set state to RETURNED
struct sandbox *previous_sandbox = current_sandbox_get();
assert(previous_sandbox);
software_interrupt_disable();
sandbox_run_queue_remove(previous_sandbox);
previous_sandbox->state = RETURNED;
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
assert(next_sandbox != previous_sandbox);
software_interrupt_enable();
// Because the stack is still in use, only unmap linear memory and defer free resources until "main function
// execution"
munmap(previous_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE);
worker_thread_switch_to_sandbox(next_sandbox);
}

@ -1,5 +1,6 @@
#include <assert.h> #include <assert.h>
#include <runtime.h> #include <runtime.h>
#include <worker_thread.h>
#include <sandbox.h> #include <sandbox.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <pthread.h> #include <pthread.h>

@ -0,0 +1,282 @@
// Something is not idempotent with this or some other include.
// If placed in Local Includes, error is triggered that memset was implicitly declared
#include <runtime.h>
/***************************
* External Includes *
**************************/
#include <pthread.h> // POSIX Threads
#include <signal.h> // POSIX Signals
#include <sched.h> // Wasmception. Included as submodule
#include <sys/mman.h> // Wasmception. Included as submodule
#include <uv.h> // Libuv
/***************************
* Local Includes *
**************************/
#include <current_sandbox.h>
#include <sandbox_completion_queue.h>
#include <sandbox_request_scheduler.h>
#include <sandbox_run_queue.h>
// #include <sandbox_run_queue_fifo.h>
#include <sandbox_run_queue_ps.h>
#include <types.h>
#include <worker_thread.h>
/***************************
* Worker Thread State *
**************************/
// context pointer to switch to when this thread gets a SIGUSR1
__thread arch_context_t *worker_thread_next_context = NULL;
// context of the runtime thread before running sandboxes or to resume its "main".
__thread arch_context_t worker_thread_base_context;
// libuv i/o loop handle per sandboxing thread!
__thread uv_loop_t worker_thread_uvio_handle;
// Flag to signify if the thread is currently running callbacks in the libuv event loop
static __thread bool worker_thread_is_in_callback;
/**************************************************
* Worker Thread Logic
*************************************************/
/**
* @brief Switches to the next sandbox, placing the current sandbox of the completion queue if in RETURNED state
* @param next The Sandbox Context to switch to or NULL
* @return void
*/
static inline void
worker_thread_switch_to_sandbox(struct sandbox *next_sandbox)
{
arch_context_t *next_register_context = next_sandbox == NULL ? NULL : &next_sandbox->ctxt;
software_interrupt_disable();
// Get the old sandbox we're switching from
struct sandbox *previous_sandbox = current_sandbox_get();
arch_context_t *previous_register_context = previous_sandbox == NULL ? NULL : &previous_sandbox->ctxt;
// Set the current sandbox to the next
current_sandbox_set(next_sandbox);
// and switch to the associated context. But what is the purpose of worker_thread_next_context?
worker_thread_next_context = next_register_context;
arch_context_switch(previous_register_context, next_register_context);
// If the current sandbox we're switching from is in a RETURNED state, add to completion queue
if (previous_sandbox != NULL && previous_sandbox->state == RETURNED)
sandbox_completion_queue_add(previous_sandbox);
software_interrupt_enable();
}
/**
* Mark a blocked sandbox as runnable and add it to the runqueue
* @param sandbox the sandbox to check and update if blocked
**/
void
worker_thread_wakeup_sandbox(sandbox_t *sandbox)
{
software_interrupt_disable();
// debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
if (sandbox->state == BLOCKED) {
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
}
software_interrupt_enable();
}
/**
* Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head
*of the runqueue
**/
void
worker_thread_block_current_sandbox(void)
{
assert(worker_thread_is_in_callback == false);
software_interrupt_disable();
// Remove the sandbox we were just executing from the runqueue and mark as blocked
struct sandbox *previous_sandbox = current_sandbox_get();
sandbox_run_queue_remove(previous_sandbox);
previous_sandbox->state = BLOCKED;
// Switch to the next sandbox
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", previous_sandbox, previous_sandbox->module->name,
next_sandbox, next_sandbox ? next_sandbox->module->name : "");
software_interrupt_enable();
worker_thread_switch_to_sandbox(next_sandbox);
}
/**
* Execute I/O
**/
void
worker_thread_process_io(void)
{
#ifdef USE_HTTP_UVIO
#ifdef USE_HTTP_SYNC
// realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, not
// great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do
// async block!
uv_run(worker_thread_get_libuv_handle(), UV_RUN_DEFAULT);
#else /* USE_HTTP_SYNC */
worker_thread_block_current_sandbox();
#endif /* USE_HTTP_UVIO */
#else
assert(false);
// it should not be called if not using uvio for http
#endif
}
/**
* TODO: What is this doing?
**/
void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void)
{
pthread_kill(pthread_self(), SIGUSR1);
assert(false); // should not get here..
while (true)
;
}
/**
* Pulls up to 1..n sandbox requests, allocates them as sandboxes, sets them as runnable and places them on the local
* runqueue, and then frees the sandbox requests The batch size pulled at once is set by SANDBOX_PULL_BATCH_SIZE
* @return the number of sandbox requests pulled
*/
static inline int
worker_thread_pull_and_process_sandbox_requests(void)
{
int total_sandboxes_pulled = 0;
while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) {
sandbox_request_t *sandbox_request;
if ((sandbox_request = sandbox_request_scheduler_remove()) == NULL) break;
// Actually allocate the sandbox for the requests that we've pulled
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
// Set the sandbox as runnable and place on the local runqueue
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
total_sandboxes_pulled++;
}
return total_sandboxes_pulled;
}
/**
* Run all outstanding events in the local thread's libuv event loop
**/
void
worker_thread_execute_libuv_event_loop(void)
{
worker_thread_is_in_callback = true;
int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0;
while (n > 0) {
n--;
uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT);
}
worker_thread_is_in_callback = false;
}
/**
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @return the sandbox to execute or NULL if none are available
**/
struct sandbox *
worker_thread_get_next_sandbox()
{
if (sandbox_run_queue_is_empty()) {
int sandboxes_pulled = worker_thread_pull_and_process_sandbox_requests();
if (sandboxes_pulled == 0) return NULL;
}
// Execute Round Robin Scheduling Logic
struct sandbox *next_sandbox = sandbox_run_queue_remove();
assert(next_sandbox->state != RETURNED);
sandbox_run_queue_add(next_sandbox);
debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name);
return next_sandbox;
}
/**
* The entry function for sandbox worker threads
* Initializes thread-local state, unmasks signals, sets up libuv loop and
* @param return_code - argument provided by pthread API. We set to -1 on error
**/
void *
worker_thread_main(void *return_code)
{
// Initialize Worker State
arch_context_init(&worker_thread_base_context, 0, 0);
// sandbox_run_queue_fifo_initialize();
sandbox_run_queue_ps_initialize();
sandbox_completion_queue_initialize();
software_interrupt_is_disabled = false;
worker_thread_next_context = NULL;
#ifndef PREEMPT_DISABLE
software_interrupt_unmask_signal(SIGALRM);
software_interrupt_unmask_signal(SIGUSR1);
#endif
uv_loop_init(&worker_thread_uvio_handle);
worker_thread_is_in_callback = false;
// Begin Worker Execution Loop
struct sandbox *next_sandbox;
while (true) {
assert(current_sandbox_get() == NULL);
// If "in a callback", the libuv event loop is triggering this, so we don't need to start it
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
software_interrupt_disable();
next_sandbox = worker_thread_get_next_sandbox();
software_interrupt_enable();
if (next_sandbox != NULL) {
worker_thread_switch_to_sandbox(next_sandbox);
sandbox_completion_queue_free(1);
}
}
*(int *)return_code = -1;
pthread_exit(return_code);
}
/**
* Called when the function in the sandbox exits
* Removes the standbox from the thread-local runqueue, sets its state to RETURNED,
* releases the linear memory, and then switches to the sandbox at the head of the runqueue
* TODO: Consider moving this to a future current_sandbox file. This has thus far proven difficult to move
**/
void
worker_thread_exit_current_sandbox(void)
{
// Remove the sandbox that exited from the runqueue and set state to RETURNED
struct sandbox *previous_sandbox = current_sandbox_get();
assert(previous_sandbox);
software_interrupt_disable();
sandbox_run_queue_remove(previous_sandbox);
previous_sandbox->state = RETURNED;
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
assert(next_sandbox != previous_sandbox);
software_interrupt_enable();
// Because the stack is still in use, only unmap linear memory and defer free resources until "main function
// execution"
munmap(previous_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE);
worker_thread_switch_to_sandbox(next_sandbox);
}
Loading…
Cancel
Save