chore: assorted refactors

main
Sean McBride 5 years ago
parent 9762477902
commit ea888ddbb3

@ -49,11 +49,15 @@ struct module {
mod_main_fn_t main;
};
struct module *module_find_by_name(char *name);
struct module *module_find_by_socket_descriptor(int socket_descriptor);
struct module *module_alloc(char *mod_name, char *mod_path, i32 argument_count, u32 stack_sz, u32 max_heap, u32 timeout, int port, int req_sz, int resp_sz);
void module_free(struct module *module);
struct module *find_module_by_name(char *name);
struct module *find_module_by_socket_descriptor(int socket_descriptor);
/***************************************
* Module "Methods"
***************************************/
struct module *module__new(char *mod_name, char *mod_path, i32 argument_count, u32 stack_sz, u32 max_heap, u32 timeout, int port, int req_sz, int resp_sz);
void module__free(struct module *module);
/**
* Sets the HTTP Request and Response Headers and Content type on a module
@ -66,7 +70,7 @@ void module_free(struct module *module);
* @param response_content_type
**/
static inline void
module_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[],
module__set_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[],
int response_count, char *response_headers, char response_content_type[])
{
assert(module);
@ -84,7 +88,7 @@ module_http_info(struct module *module, int request_count, char *request_headers
* @return 1 if valid. 0 if invalid
**/
static inline int
module_is_valid(struct module *module)
module__is_valid(struct module *module)
{
if (module && module->dynamic_library_handle && module->main) return 1;
return 0;
@ -98,7 +102,7 @@ module_is_valid(struct module *module)
* @param module
**/
static inline void
module_globals_init(struct module *module)
module__initialize_globals(struct module *module)
{
// called in a sandbox.
module->initialize_globals();
@ -109,7 +113,7 @@ module_globals_init(struct module *module)
* @param module
**/
static inline void
module_table_init(struct module *module)
module__initialize_table(struct module *module)
{
// called at module creation time (once only per module).
module->initialize_tables();
@ -120,7 +124,7 @@ module_table_init(struct module *module)
* @param module
**/
static inline void
module_libc_init(struct module *module, i32 env, i32 arguments)
module__initialize_libc(struct module *module, i32 env, i32 arguments)
{
// called in a sandbox.
module->initialize_libc(env, arguments);
@ -131,7 +135,7 @@ module_libc_init(struct module *module, i32 env, i32 arguments)
* @param module
**/
static inline void
module_memory_init(struct module *module)
module__initialize_memory(struct module *module)
{
// called in a sandbox.
module->initialize_memory();
@ -144,7 +148,7 @@ module_memory_init(struct module *module)
* @param argv standard UNIX vector of arguments
**/
static inline i32
module_entry(struct module *module, i32 argc, i32 argv)
module__main(struct module *module, i32 argc, i32 argv)
{
return module->main(argc, argv);
}
@ -154,7 +158,7 @@ module_entry(struct module *module, i32 argc, i32 argv)
* @param module
**/
static inline void
module_acquire(struct module *module)
module__acquire(struct module *module)
{
// TODO: atomic.
module->reference_count++;
@ -165,7 +169,7 @@ module_acquire(struct module *module)
* @param module
**/
static inline void
module_release(struct module *module)
module__release(struct module *module)
{
// TODO: atomic.
module->reference_count--;
@ -177,7 +181,7 @@ module_release(struct module *module)
* @returns the number of arguments
**/
static inline i32
module_argument_count(struct module *module)
module__get_argument_count(struct module *module)
{
return module->argument_count;
}

@ -21,12 +21,13 @@ INLINE char *get_memory_ptr_for_runtime(u32 offset, u32 bounds_check);
void initialize_runtime(void);
void initialize_listener_thread(void);
void stub_init(i32 offset);
void *worker_thread_main(void *return_code);
/**
* ???
* @param offset ????
* @param bounds_check ???
* @return ???
* TODO: ???
* @param offset TODO: ????
* @param bounds_check TODO: ???
* @return TODO: ???
**/
static inline void *
get_memory_ptr_void(u32 offset, u32 bounds_check)
@ -35,9 +36,9 @@ get_memory_ptr_void(u32 offset, u32 bounds_check)
}
/**
* ???
* @param offset ????
* @return ???
* TODO: ???
* @param offset TODO: ????
* @return TODO: ???
**/
static inline char *
get_memory_string(u32 offset)
@ -56,10 +57,10 @@ get_memory_string(u32 offset)
}
/**
* Get uvio
* Get global libuv handle
**/
static inline uv_loop_t *
runtime_uvio(void)
get_thread_libuv_handle(void)
{
return &uvio_handle;
}

@ -64,7 +64,7 @@ stub_init(i32 offset)
i32 env_vec_offset = offset;
memcpy(get_memory_ptr_for_runtime(env_vec_offset, sizeof(env_vec)), env_vec, sizeof(env_vec));
module_libc_init(get_current_sandbox()->module, env_vec_offset, program_name_offset);
module__initialize_libc(get_current_sandbox()->module, env_vec_offset, program_name_offset);
}
// Emulated syscall implementations
@ -121,7 +121,7 @@ wasm_read(i32 filedes, i32 buf_offset, i32 nbyte)
debuglog("[%p] start[%d:%d, n%d]\n", uv_fs_get_data(&req), filedes, f, nbyte);
uv_buf_t bufv = uv_buf_init(buffer, nbyte);
uv_fs_read(runtime_uvio(), &req, f, &bufv, 1, -1, wasm_fs_callback);
uv_fs_read(get_thread_libuv_handle(), &req, f, &bufv, 1, -1, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -152,7 +152,7 @@ wasm_write(i32 file_descriptor, i32 buf_offset, i32 buf_size)
char * buffer = get_memory_ptr_void(buf_offset, buf_size);
uv_buf_t bufv = uv_buf_init(buffer, buf_size);
uv_fs_write(runtime_uvio(), &req, f, &bufv, 1, -1, wasm_fs_callback);
uv_fs_write(get_thread_libuv_handle(), &req, f, &bufv, 1, -1, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -195,7 +195,7 @@ wasm_open(i32 path_off, i32 flags, i32 mode)
if (flags & WO_EXCL) modified_flags |= O_EXCL;
debuglog("[%p] start[%s:%d:%d]\n", uv_fs_get_data(&req), path, flags, modified_flags);
uv_fs_open(runtime_uvio(), &req, path, modified_flags, mode, wasm_fs_callback);
uv_fs_open(get_thread_libuv_handle(), &req, path, modified_flags, mode, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -232,7 +232,7 @@ wasm_close(i32 file_descriptor)
uv_fs_t req = UV_FS_REQ_INIT();
debuglog("[%p] file[%d,%d]\n", uv_fs_get_data(&req), file_descriptor, d);
uv_fs_close(runtime_uvio(), &req, d, wasm_fs_callback);
uv_fs_close(get_thread_libuv_handle(), &req, d, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -529,7 +529,7 @@ wasm_readv(i32 file_descriptor, i32 iov_offset, i32 iovcnt)
iov[i + j].len);
}
debuglog("[%p] start[%d,%d, n%d:%d]\n", uv_fs_get_data(&req), file_descriptor, d, i, j);
uv_fs_read(runtime_uvio(), &req, d, bufs, j, -1, wasm_fs_callback);
uv_fs_read(get_thread_libuv_handle(), &req, d, bufs, j, -1, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -576,7 +576,7 @@ wasm_writev(i32 file_descriptor, i32 iov_offset, i32 iovcnt)
iov[i + j].len);
}
debuglog("[%p] start[%d,%d, n%d:%d]\n", uv_fs_get_data(&req), file_descriptor, d, i, j);
uv_fs_write(runtime_uvio(), &req, d, bufs, j, -1, wasm_fs_callback);
uv_fs_write(get_thread_libuv_handle(), &req, d, bufs, j, -1, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -638,7 +638,7 @@ wasm_fsync(u32 file_descriptor)
int d = get_current_sandbox_file_descriptor(file_descriptor);
uv_fs_t req = UV_FS_REQ_INIT();
debuglog("[%p] start[%d,%d]\n", uv_fs_get_data(&req), file_descriptor, d);
uv_fs_fsync(runtime_uvio(), &req, d, wasm_fs_callback);
uv_fs_fsync(get_thread_libuv_handle(), &req, d, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -666,7 +666,7 @@ wasm_unlink(u32 path_str_offset)
char * str = get_memory_string(path_str_offset);
uv_fs_t req = UV_FS_REQ_INIT();
debuglog("[%p] start[%s]\n", uv_fs_get_data(&req), str);
uv_fs_unlink(runtime_uvio(), &req, str, wasm_fs_callback);
uv_fs_unlink(get_thread_libuv_handle(), &req, str, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -738,7 +738,7 @@ wasm_fchown(i32 file_descriptor, u32 owner, u32 group)
int d = get_current_sandbox_file_descriptor(file_descriptor);
uv_fs_t req = UV_FS_REQ_INIT();
debuglog("[%p] start[%d,%d]\n", uv_fs_get_data(&req), file_descriptor, d);
uv_fs_fchown(runtime_uvio(), &req, d, owner, group, wasm_fs_callback);
uv_fs_fchown(get_thread_libuv_handle(), &req, d, owner, group, wasm_fs_callback);
block_current_sandbox();
int ret = uv_fs_get_result(&req);
@ -787,12 +787,12 @@ wasm_socket(i32 domain, i32 type, i32 protocol)
if (type & SOCK_DGRAM) {
uv_udp_t *uh = (uv_udp_t *)h;
uh->data = c;
uv_udp_init(runtime_uvio(), uh);
uv_udp_init(get_thread_libuv_handle(), uh);
debuglog(" udp init done!\n");
} else if (type & SOCK_STREAM) {
uv_tcp_t *uh = (uv_tcp_t *)h;
uh->data = c;
uv_tcp_init(runtime_uvio(), uh);
uv_tcp_init(get_thread_libuv_handle(), uh);
debuglog(" tcp init done!\n");
} else {
assert(0); // not supported yet!
@ -832,7 +832,7 @@ wasm_connect(i32 sockfd, i32 sockaddr_offset, i32 addrlen)
i32
wasm_accept(i32 sockfd, i32 sockaddr_offset, i32 addrlen_offset)
{
// what do we do with the sockaddr ????
// what do we do with the sockaddr TODO: ????
socklen_t * addrlen = get_memory_ptr_void(addrlen_offset, sizeof(socklen_t));
struct sockaddr * socket_address = get_memory_ptr_void(sockaddr_offset, *addrlen);
union uv_any_handle *s = get_current_sandbox_libuv_handle(sockfd);
@ -844,7 +844,7 @@ wasm_accept(i32 sockfd, i32 sockaddr_offset, i32 addrlen_offset)
// assert so we can look into whether we need to implement UDP or others..
assert(((uv_handle_t *)s)->type == UV_TCP);
union uv_any_handle *h = get_current_sandbox_libuv_handle(cfd);
uv_tcp_init(runtime_uvio(), (uv_tcp_t *)h);
uv_tcp_init(get_thread_libuv_handle(), (uv_tcp_t *)h);
debuglog("[%p] tcp init %d\n", c, cfd);
int r = uv_accept((uv_stream_t *)s, (uv_stream_t *)h);
if (r < 0) return r;

@ -125,7 +125,7 @@ void
start_worker_threads()
{
for (int i = 0; i < total_worker_processors; i++) {
int ret = pthread_create(&worker_threads[i], NULL, sandbox_worker_main,
int ret = pthread_create(&worker_threads[i], NULL, worker_thread_main,
(void *)&worker_threads_argument[i]);
if (ret) {
errno = ret;

@ -6,6 +6,10 @@
#include <uv.h>
#include <util.h>
/***************************************
* Module Database
***************************************/
// In-memory representation of all active modules
static struct module *__mod_db[MOD_MAX] = { NULL };
// TODO: What is this?
@ -17,7 +21,7 @@ static int __mod_free_off = 0;
* @return module or NULL if no match found
**/
struct module *
module_find_by_name(char *name)
find_module_by_name(char *name)
{
int f = __mod_free_off;
for (int i = 0; i < f; i++) {
@ -33,7 +37,7 @@ module_find_by_name(char *name)
* @return module or NULL if no match found
**/
struct module *
module_find_by_socket_descriptor(int socket_descriptor)
find_module_by_socket_descriptor(int socket_descriptor)
{
int f = __mod_free_off;
for (int i = 0; i < f; i++) {
@ -49,7 +53,7 @@ module_find_by_socket_descriptor(int socket_descriptor)
* @return 0 on success. Aborts program on failure
**/
static inline int
module_add(struct module *module)
add_module(struct module *module)
{
assert(module->socket_descriptor == -1);
@ -61,12 +65,16 @@ module_add(struct module *module)
return 0;
}
/***************************************
* Module "Methods"
***************************************/
/**
* Start the module as a server listening at module->port
* @param module
**/
static inline void
module_server_init(struct module *module)
module__initialize_as_server(struct module *module)
{
// Allocate a new socket
int socket_descriptor = socket(AF_INET, SOCK_STREAM, 0);
@ -102,7 +110,30 @@ module_server_init(struct module *module)
}
/**
* Module Mega Setup Function
* Module Mega Teardown Function
* Closes the socket and dynamic library, and then frees the module
* Returns harmlessly if there are outstanding references
* @param module - the module to teardown
**/
void
module__free(struct module *module)
{
if (module == NULL) return;
if (module->dynamic_library_handle == NULL) return;
// Do not free if we still have oustanding references
if (module->reference_count) return;
// TODO: What about the module database? Do we need to do any cleanup there?
close(module->socket_descriptor);
dlclose(module->dynamic_library_handle);
free(module);
}
/**
* Module Contructor
* Creates a new module, invokes initialize_tables to initialize the indirect table, adds it to the module DB, and starts
*listening for HTTP Requests
*
@ -117,7 +148,7 @@ module_server_init(struct module *module)
* @returns A new module or NULL in case of failure
**/
struct module *
module_alloc(char *name, char *path, i32 argument_count, u32 stack_size, u32 max_memory, u32 timeout, int port,
module__new(char *name, char *path, i32 argument_count, u32 stack_size, u32 max_memory, u32 timeout, int port,
int request_size, int response_size)
{
struct module *module = (struct module *)malloc(sizeof(struct module));
@ -167,21 +198,21 @@ module_alloc(char *name, char *path, i32 argument_count, u32 stack_size, u32 max
// assumption: All modules are created at program start before we enable preemption or enable the execution of
// any worker threads We are checking that thread-local module_indirect_table is NULL to prove that we aren't
// yet preempting If we want to be able to do this later, we can possibly defer module_table_init until the
// yet preempting If we want to be able to do this later, we can possibly defer module__initialize_table until the
// first invocation
assert(cache_tbl == NULL);
// TODO: determine why we have to set the module_indirect_table state before calling table init and then restore
// the existing value What is the relationship between these things?
module_indirect_table = module->indirect_table;
module_table_init(module);
module__initialize_table(module);
module_indirect_table = cache_tbl;
// Add the module to the in-memory module DB
module_add(module);
add_module(module);
// Start listening for requests
module_server_init(module);
module__initialize_as_server(module);
return module;
@ -194,24 +225,3 @@ dl_open_error:
return NULL;
}
/**
* Module Mega Teardown Function
* Closes the socket and dynamic library, and then frees the module
* Returns harmlessly if there are outstanding references
* @param module - the module to teardown
**/
void
module_free(struct module *module)
{
if (module == NULL) return;
if (module->dynamic_library_handle == NULL) return;
// Do not free if we still have oustanding references
if (module->reference_count) return;
// TODO: What about the module database? Do we need to do any cleanup there?
close(module->socket_descriptor);
dlclose(module->dynamic_library_handle);
free(module);
}

@ -19,8 +19,117 @@ struct deque_sandbox *global_deque;
pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER;
int epoll_file_descriptor;
/******************************************
* Shared Process / Listener Thread Logic *
******************************************/
/**
* Initialize runtime global state, mask signals, and init http parser
*/
void
initialize_runtime(void)
{
epoll_file_descriptor = epoll_create1(0);
assert(epoll_file_descriptor >= 0);
// Allocate and Initialize the global deque
global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox));
assert(global_deque);
// Note: Below is a Macro
deque_init_sandbox(global_deque, SBOX_MAX_REQS);
// Mask Signals
softint_mask(SIGUSR1);
softint_mask(SIGALRM);
// Initialize http-parser
http_init();
}
/********************************
* Listener Thread Logic *
********************************/
/**
* @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the
* sandbox object to the global dequeue
* @param dummy data pointer provided by pthreads API. Unused in this function
* @return NULL
*
* Used Globals:
* epoll_file_descriptor - the epoll file descriptor
*
*/
void *
listener_thread_main(void *dummy)
{
struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event));
int total_requests = 0;
while (true) {
int request_count = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1);
u64 start_time = rdtsc();
for (int i = 0; i < request_count; i++) {
if (epoll_events[i].events & EPOLLERR) {
perror("epoll_wait");
assert(0);
}
struct sockaddr_in client_address;
socklen_t client_length = sizeof(client_address);
struct module * module = (struct module *)epoll_events[i].data.ptr;
assert(module);
int es = module->socket_descriptor;
int socket_descriptor = accept(es, (struct sockaddr *)&client_address, &client_length);
if (socket_descriptor < 0) {
perror("accept");
assert(0);
}
total_requests++;
printf("Received Request %d at %lu\n", total_requests, start_time);
sandbox_request_t *sandbox_request = allocate_sandbox_request(
module,
module->name,
socket_descriptor,
(const struct sockaddr *)&client_address,
start_time);
assert(sandbox_request);
// TODO: Refactor allocate_sandbox_request to not add to global request queue and do this here
}
}
free(epoll_events);
return NULL;
}
/**
* Initializes the listener thread, pinned to core 0, and starts to listen for requests
*/
void
initialize_listener_thread(void)
{
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(MOD_REQ_CORE, &cs);
pthread_t listener_thread;
int ret = pthread_create(&listener_thread, NULL, listener_thread_main, NULL);
assert(ret == 0);
ret = pthread_setaffinity_np(listener_thread, sizeof(cpu_set_t), &cs);
assert(ret == 0);
ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs);
assert(ret == 0);
softint_init();
softint_timer_arm();
}
/***************************
* Thread Local State *
* Worker Thread State *
**************************/
__thread static struct ps_list_head local_run_queue;
@ -42,8 +151,84 @@ __thread uv_loop_t uvio_handle;
// Flag to signify if the thread is currently running callbacks in the libuv event loop
static __thread unsigned int in_callback;
/**************************************************
* Worker Thread Logic
*************************************************/
static inline void add_sandbox_to_local_run_queue(struct sandbox *sandbox);
/**
* If this sandbox is blocked, mark it as runnable and add to the head of the thread-local runqueue
* @param sandbox the sandbox to check and update if blocked
**/
void
wakeup_sandbox(sandbox_t *sandbox)
{
softint_disable();
debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
if (sandbox->state != BLOCKED) goto done;
assert(sandbox->state == BLOCKED);
assert(ps_list_singleton_d(sandbox));
sandbox->state = RUNNABLE;
ps_list_head_append_d(&local_run_queue, sandbox);
done:
softint_enable();
}
/**
* Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head of the runqueue
**/
void
block_current_sandbox(void)
{
assert(in_callback == 0);
softint_disable();
struct sandbox *current_sandbox = get_current_sandbox();
// TODO: What is this getting removed from again? the thread-local runqueue?
ps_list_rem_d(current_sandbox);
current_sandbox->state = BLOCKED;
struct sandbox *next_sandbox = get_next_sandbox_from_local_run_queue(0);
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : "");
softint_enable();
switch_to_sandbox(next_sandbox);
}
/**
* TODO: What is this doing?
**/
void
sandbox_block_http(void)
{
#ifdef USE_HTTP_UVIO
#ifdef USE_HTTP_SYNC
// realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, not
// great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do
// async block!
uv_run(get_thread_libuv_handle(), UV_RUN_DEFAULT);
#else /* USE_HTTP_SYNC */
block_current_sandbox();
#endif /* USE_HTTP_UVIO */
#else
assert(0);
// it should not be called if not using uvio for http
#endif
}
/**
* TODO: What is this doing?
**/
void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void)
{
pthread_kill(pthread_self(), SIGUSR1);
assert(0); // should not get here..
while (true)
;
}
/**
* Pulls up to 1..n sandbox requests, allocates them as sandboxes, sets them as runnable and places them on the local
* runqueue, and then frees the sandbox requests The batch size pulled at once is set by SBOX_PULL_MAX
@ -73,16 +258,16 @@ pull_sandbox_requests_from_global_runqueue(void)
}
/**
* Run all outstanding events in the libuv event loop
* Run all outstanding events in the local thread's libuv event loop
**/
void
execute_libuv_event_loop(void)
{
in_callback = 1;
int n = uv_run(runtime_uvio(), UV_RUN_NOWAIT), i = 0;
int n = uv_run(get_thread_libuv_handle(), UV_RUN_NOWAIT), i = 0;
while (n > 0) {
n--;
uv_run(runtime_uvio(), UV_RUN_NOWAIT);
uv_run(get_thread_libuv_handle(), UV_RUN_NOWAIT);
}
in_callback = 0;
}
@ -171,85 +356,13 @@ free_sandboxes_from_completion_queue(unsigned int number_to_free)
}
}
/**
* If this sandbox is blocked, mark it as runnable and add to the head of the thread-local runqueue
* @param sandbox the sandbox to check and update if blocked
**/
void
wakeup_sandbox(sandbox_t *sandbox)
{
softint_disable();
debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
if (sandbox->state != BLOCKED) goto done;
assert(sandbox->state == BLOCKED);
assert(ps_list_singleton_d(sandbox));
sandbox->state = RUNNABLE;
ps_list_head_append_d(&local_run_queue, sandbox);
done:
softint_enable();
}
/**
* Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head of the runqueue
**/
void
block_current_sandbox(void)
{
assert(in_callback == 0);
softint_disable();
struct sandbox *current_sandbox = get_current_sandbox();
// TODO: What is this getting removed from again? the thread-local runqueue?
ps_list_rem_d(current_sandbox);
current_sandbox->state = BLOCKED;
struct sandbox *next_sandbox = get_next_sandbox_from_local_run_queue(0);
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : "");
softint_enable();
switch_to_sandbox(next_sandbox);
}
/**
* TODO: What is this doing?
**/
void
sandbox_block_http(void)
{
#ifdef USE_HTTP_UVIO
#ifdef USE_HTTP_SYNC
// realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, not
// great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do
// async block!
uv_run(runtime_uvio(), UV_RUN_DEFAULT);
#else /* USE_HTTP_SYNC */
block_current_sandbox();
#endif /* USE_HTTP_UVIO */
#else
assert(0);
// it should not be called if not using uvio for http
#endif
}
/**
* TODO: What is this doing?
**/
void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void)
{
pthread_kill(pthread_self(), SIGUSR1);
assert(0); // should not get here..
while (true)
;
}
/**
* Tries to free a completed request, executes libuv callbacks, and then gets
* and returns the standbox at the head of the thread-local runqueue
* @return sandbox or NULL
**/
struct sandbox *
sandbox_worker_single_loop(void)
worker_thread_single_loop(void)
{
assert(get_current_sandbox() == NULL);
// Try to free one sandbox from the completion queue
@ -271,7 +384,7 @@ sandbox_worker_single_loop(void)
* @param return_code - argument provided by pthread API. We set to -1 on error
**/
void *
sandbox_worker_main(void *return_code)
worker_thread_main(void *return_code)
{
arch_context_init(&base_context, 0, 0);
@ -287,10 +400,10 @@ sandbox_worker_main(void *return_code)
in_callback = 0;
while (true) {
struct sandbox *sandbox = sandbox_worker_single_loop();
struct sandbox *sandbox = worker_thread_single_loop();
while (sandbox) {
switch_to_sandbox(sandbox);
sandbox = sandbox_worker_single_loop();
sandbox = worker_thread_single_loop();
}
}
@ -298,11 +411,14 @@ sandbox_worker_main(void *return_code)
pthread_exit(return_code);
}
// RANDOM... Is this in the right place?
/**
* Called when the function in the sandbox exits
* Removes the standbox from the thread-local runqueue, sets its state to RETURNED,
* releases the linear memory, and then switches to the sandbox at the head of the runqueue
* TODO: Why are we not adding to the completion queue here? That logic is commented out.
* TODO: Does this belong in sandbox.c?
**/
void
exit_current_sandbox(void)
@ -321,105 +437,4 @@ exit_current_sandbox(void)
munmap(current_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE);
// add_sandbox_to_completion_queue(current_sandbox);
switch_to_sandbox(next_sandbox);
}
/**
* @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the
* sandbox object to the global dequeue
* @param dummy data pointer provided by pthreads API. Unused in this function
* @return NULL
*
* Used Globals:
* epoll_file_descriptor - the epoll file descriptor
*
*/
void *
listener_thread_main(void *dummy)
{
struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event));
int total_requests = 0;
while (true) {
int request_count = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1);
u64 start_time = rdtsc();
for (int i = 0; i < request_count; i++) {
if (epoll_events[i].events & EPOLLERR) {
perror("epoll_wait");
assert(0);
}
struct sockaddr_in client_address;
socklen_t client_length = sizeof(client_address);
struct module * module = (struct module *)epoll_events[i].data.ptr;
assert(module);
int es = module->socket_descriptor;
int socket_descriptor = accept(es, (struct sockaddr *)&client_address, &client_length);
if (socket_descriptor < 0) {
perror("accept");
assert(0);
}
total_requests++;
printf("Received Request %d at %lu\n", total_requests, start_time);
sandbox_request_t *sandbox_request = allocate_sandbox_request(
module,
module->name,
socket_descriptor,
(const struct sockaddr *)&client_address,
start_time);
assert(sandbox_request);
// TODO: Refactor allocate_sandbox_request to not add to global request queue and do this here
}
}
free(epoll_events);
return NULL;
}
/**
* Initialize runtime global state, mask signals, and init http parser
*/
void
initialize_runtime(void)
{
epoll_file_descriptor = epoll_create1(0);
assert(epoll_file_descriptor >= 0);
// Allocate and Initialize the global deque
global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox));
assert(global_deque);
// Note: Below is a Macro
deque_init_sandbox(global_deque, SBOX_MAX_REQS);
// Mask Signals
softint_mask(SIGUSR1);
softint_mask(SIGALRM);
// Initialize http-parser
http_init();
}
/**
* Initializes the listener thread, pinned to core 0, and starts to listen for requests
*/
void
initialize_listener_thread(void)
{
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(MOD_REQ_CORE, &cs);
pthread_t listener_thread;
int ret = pthread_create(&listener_thread, NULL, listener_thread_main, NULL);
assert(ret == 0);
ret = pthread_setaffinity_np(listener_thread, sizeof(cpu_set_t), &cs);
assert(ret == 0);
ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs);
assert(ret == 0);
softint_init();
softint_timer_arm();
}
}

@ -127,7 +127,7 @@ allocate_sandbox_memory(struct module *module)
sandbox->linear_memory_size = linear_memory_size;
sandbox->module = module;
sandbox->sandbox_size = sandbox_size;
module_acquire(module);
module__acquire(module);
return sandbox;
}
@ -284,7 +284,7 @@ sandbox_main(void)
softint_enable();
}
struct module *current_module = get_sandbox_module(current_sandbox);
int argument_count = module_argument_count(current_module);
int argument_count = module__get_argument_count(current_module);
// for stdio
// Try to initialize file descriptors 0, 1, and 2 as io handles 0, 1, 2
@ -309,7 +309,7 @@ sandbox_main(void)
#ifdef USE_HTTP_UVIO
// Initialize libuv TCP stream
int r = uv_tcp_init(runtime_uvio(), (uv_tcp_t *)&current_sandbox->client_libuv_stream);
int r = uv_tcp_init(get_thread_libuv_handle(), (uv_tcp_t *)&current_sandbox->client_libuv_stream);
assert(r == 0);
// Set the current sandbox as the data the libuv callbacks have access to
@ -329,15 +329,15 @@ sandbox_main(void)
// Allocate the WebAssembly Sandbox
alloc_linear_memory();
// perhaps only initialized for the first instance? or TODO!
// module_table_init(current_module);
module_globals_init(current_module);
module_memory_init(current_module);
// module__initialize_table(current_module);
module__initialize_globals(current_module);
module__initialize_memory(current_module);
// Copy the arguments into the WebAssembly sandbox
setup_sandbox_arguments(argument_count);
// Executing the function within the WebAssembly sandbox
current_sandbox->return_value = module_entry(current_module, argument_count, current_sandbox->arguments_offset);
current_sandbox->return_value = module__main(current_module, argument_count, current_sandbox->arguments_offset);
// Retrieve the result from the WebAssembly sandbox, construct the HTTP response, and send to client
build_and_send_current_sandbox_client_response();
@ -357,7 +357,7 @@ sandbox_main(void)
struct sandbox *
allocate_sandbox(struct module *module, char *arguments, int socket_descriptor, const struct sockaddr *socket_address, u64 start_time)
{
if (!module_is_valid(module)) return NULL;
if (!module__is_valid(module)) return NULL;
// FIXME: don't use malloc. huge security problem!
// perhaps, main should be in its own sandbox, when it is not running any sandbox.
@ -401,7 +401,7 @@ free_sandbox(struct sandbox *sandbox)
int sz = sizeof(struct sandbox);
sz += sandbox->module->max_request_or_response_size;
module_release(sandbox->module);
module__release(sandbox->module);
// TODO free(sandbox->arguments);
void * stkaddr = sandbox->stack_start;

@ -102,7 +102,7 @@ extern pthread_t worker_threads[];
/**
* The handler function for Software Interrupts (signals)
* SIGALRM is executed periodically by an interval timer, causing preemption of the current sandbox
* SIGUSR1 does ??????
* SIGUSR1 does TODO: ???TODO: ???
* @param signal_type
* @param signal_info data structure containing signal info
* @param user_context_raw void* to a user_context struct

@ -162,10 +162,10 @@ util_parse_modules_file_json(char *file_name)
if (is_active == 0) continue;
// Allocate a module based on the values from the JSON
struct module *module = module_alloc(module_name, module_path, argument_count, 0, 0, 0, port,
struct module *module = module__new(module_name, module_path, argument_count, 0, 0, 0, port,
request_size, response_size);
assert(module);
module_http_info(module, request_count, request_headers, request_content_type, response_count,
module__set_http_info(module, request_count, request_headers, request_content_type, response_count,
reponse_headers, response_content_type);
module_count++;
free(request_headers);

Loading…
Cancel
Save