From ea888ddbb36ac7122ee4fdeca670c335226084b0 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Sun, 8 Mar 2020 16:10:54 -0400 Subject: [PATCH] chore: assorted refactors --- runtime/include/module.h | 32 ++-- runtime/include/runtime.h | 19 +- runtime/src/libc/uvio.c | 28 +-- runtime/src/main.c | 2 +- runtime/src/module.c | 72 ++++---- runtime/src/runtime.c | 379 ++++++++++++++++++++------------------ runtime/src/sandbox.c | 18 +- runtime/src/softint.c | 2 +- runtime/src/util.c | 4 +- 9 files changed, 293 insertions(+), 263 deletions(-) diff --git a/runtime/include/module.h b/runtime/include/module.h index 054826f..08a406c 100644 --- a/runtime/include/module.h +++ b/runtime/include/module.h @@ -49,11 +49,15 @@ struct module { mod_main_fn_t main; }; -struct module *module_find_by_name(char *name); -struct module *module_find_by_socket_descriptor(int socket_descriptor); -struct module *module_alloc(char *mod_name, char *mod_path, i32 argument_count, u32 stack_sz, u32 max_heap, u32 timeout, int port, int req_sz, int resp_sz); -void module_free(struct module *module); +struct module *find_module_by_name(char *name); +struct module *find_module_by_socket_descriptor(int socket_descriptor); +/*************************************** + * Module "Methods" + ***************************************/ + +struct module *module__new(char *mod_name, char *mod_path, i32 argument_count, u32 stack_sz, u32 max_heap, u32 timeout, int port, int req_sz, int resp_sz); +void module__free(struct module *module); /** * Sets the HTTP Request and Response Headers and Content type on a module @@ -66,7 +70,7 @@ void module_free(struct module *module); * @param response_content_type **/ static inline void -module_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[], +module__set_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[], int response_count, char *response_headers, char response_content_type[]) { assert(module); @@ -84,7 +88,7 @@ module_http_info(struct module *module, int request_count, char *request_headers * @return 1 if valid. 0 if invalid **/ static inline int -module_is_valid(struct module *module) +module__is_valid(struct module *module) { if (module && module->dynamic_library_handle && module->main) return 1; return 0; @@ -98,7 +102,7 @@ module_is_valid(struct module *module) * @param module **/ static inline void -module_globals_init(struct module *module) +module__initialize_globals(struct module *module) { // called in a sandbox. module->initialize_globals(); @@ -109,7 +113,7 @@ module_globals_init(struct module *module) * @param module **/ static inline void -module_table_init(struct module *module) +module__initialize_table(struct module *module) { // called at module creation time (once only per module). module->initialize_tables(); @@ -120,7 +124,7 @@ module_table_init(struct module *module) * @param module **/ static inline void -module_libc_init(struct module *module, i32 env, i32 arguments) +module__initialize_libc(struct module *module, i32 env, i32 arguments) { // called in a sandbox. module->initialize_libc(env, arguments); @@ -131,7 +135,7 @@ module_libc_init(struct module *module, i32 env, i32 arguments) * @param module **/ static inline void -module_memory_init(struct module *module) +module__initialize_memory(struct module *module) { // called in a sandbox. module->initialize_memory(); @@ -144,7 +148,7 @@ module_memory_init(struct module *module) * @param argv standard UNIX vector of arguments **/ static inline i32 -module_entry(struct module *module, i32 argc, i32 argv) +module__main(struct module *module, i32 argc, i32 argv) { return module->main(argc, argv); } @@ -154,7 +158,7 @@ module_entry(struct module *module, i32 argc, i32 argv) * @param module **/ static inline void -module_acquire(struct module *module) +module__acquire(struct module *module) { // TODO: atomic. module->reference_count++; @@ -165,7 +169,7 @@ module_acquire(struct module *module) * @param module **/ static inline void -module_release(struct module *module) +module__release(struct module *module) { // TODO: atomic. module->reference_count--; @@ -177,7 +181,7 @@ module_release(struct module *module) * @returns the number of arguments **/ static inline i32 -module_argument_count(struct module *module) +module__get_argument_count(struct module *module) { return module->argument_count; } diff --git a/runtime/include/runtime.h b/runtime/include/runtime.h index 018a330..a6b48ad 100644 --- a/runtime/include/runtime.h +++ b/runtime/include/runtime.h @@ -21,12 +21,13 @@ INLINE char *get_memory_ptr_for_runtime(u32 offset, u32 bounds_check); void initialize_runtime(void); void initialize_listener_thread(void); void stub_init(i32 offset); +void *worker_thread_main(void *return_code); /** - * ??? - * @param offset ???? - * @param bounds_check ??? - * @return ??? + * TODO: ??? + * @param offset TODO: ???? + * @param bounds_check TODO: ??? + * @return TODO: ??? **/ static inline void * get_memory_ptr_void(u32 offset, u32 bounds_check) @@ -35,9 +36,9 @@ get_memory_ptr_void(u32 offset, u32 bounds_check) } /** - * ??? - * @param offset ???? - * @return ??? + * TODO: ??? + * @param offset TODO: ???? + * @return TODO: ??? **/ static inline char * get_memory_string(u32 offset) @@ -56,10 +57,10 @@ get_memory_string(u32 offset) } /** - * Get uvio + * Get global libuv handle **/ static inline uv_loop_t * -runtime_uvio(void) +get_thread_libuv_handle(void) { return &uvio_handle; } diff --git a/runtime/src/libc/uvio.c b/runtime/src/libc/uvio.c index 1103954..01028ec 100644 --- a/runtime/src/libc/uvio.c +++ b/runtime/src/libc/uvio.c @@ -64,7 +64,7 @@ stub_init(i32 offset) i32 env_vec_offset = offset; memcpy(get_memory_ptr_for_runtime(env_vec_offset, sizeof(env_vec)), env_vec, sizeof(env_vec)); - module_libc_init(get_current_sandbox()->module, env_vec_offset, program_name_offset); + module__initialize_libc(get_current_sandbox()->module, env_vec_offset, program_name_offset); } // Emulated syscall implementations @@ -121,7 +121,7 @@ wasm_read(i32 filedes, i32 buf_offset, i32 nbyte) debuglog("[%p] start[%d:%d, n%d]\n", uv_fs_get_data(&req), filedes, f, nbyte); uv_buf_t bufv = uv_buf_init(buffer, nbyte); - uv_fs_read(runtime_uvio(), &req, f, &bufv, 1, -1, wasm_fs_callback); + uv_fs_read(get_thread_libuv_handle(), &req, f, &bufv, 1, -1, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -152,7 +152,7 @@ wasm_write(i32 file_descriptor, i32 buf_offset, i32 buf_size) char * buffer = get_memory_ptr_void(buf_offset, buf_size); uv_buf_t bufv = uv_buf_init(buffer, buf_size); - uv_fs_write(runtime_uvio(), &req, f, &bufv, 1, -1, wasm_fs_callback); + uv_fs_write(get_thread_libuv_handle(), &req, f, &bufv, 1, -1, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -195,7 +195,7 @@ wasm_open(i32 path_off, i32 flags, i32 mode) if (flags & WO_EXCL) modified_flags |= O_EXCL; debuglog("[%p] start[%s:%d:%d]\n", uv_fs_get_data(&req), path, flags, modified_flags); - uv_fs_open(runtime_uvio(), &req, path, modified_flags, mode, wasm_fs_callback); + uv_fs_open(get_thread_libuv_handle(), &req, path, modified_flags, mode, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -232,7 +232,7 @@ wasm_close(i32 file_descriptor) uv_fs_t req = UV_FS_REQ_INIT(); debuglog("[%p] file[%d,%d]\n", uv_fs_get_data(&req), file_descriptor, d); - uv_fs_close(runtime_uvio(), &req, d, wasm_fs_callback); + uv_fs_close(get_thread_libuv_handle(), &req, d, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -529,7 +529,7 @@ wasm_readv(i32 file_descriptor, i32 iov_offset, i32 iovcnt) iov[i + j].len); } debuglog("[%p] start[%d,%d, n%d:%d]\n", uv_fs_get_data(&req), file_descriptor, d, i, j); - uv_fs_read(runtime_uvio(), &req, d, bufs, j, -1, wasm_fs_callback); + uv_fs_read(get_thread_libuv_handle(), &req, d, bufs, j, -1, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -576,7 +576,7 @@ wasm_writev(i32 file_descriptor, i32 iov_offset, i32 iovcnt) iov[i + j].len); } debuglog("[%p] start[%d,%d, n%d:%d]\n", uv_fs_get_data(&req), file_descriptor, d, i, j); - uv_fs_write(runtime_uvio(), &req, d, bufs, j, -1, wasm_fs_callback); + uv_fs_write(get_thread_libuv_handle(), &req, d, bufs, j, -1, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -638,7 +638,7 @@ wasm_fsync(u32 file_descriptor) int d = get_current_sandbox_file_descriptor(file_descriptor); uv_fs_t req = UV_FS_REQ_INIT(); debuglog("[%p] start[%d,%d]\n", uv_fs_get_data(&req), file_descriptor, d); - uv_fs_fsync(runtime_uvio(), &req, d, wasm_fs_callback); + uv_fs_fsync(get_thread_libuv_handle(), &req, d, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -666,7 +666,7 @@ wasm_unlink(u32 path_str_offset) char * str = get_memory_string(path_str_offset); uv_fs_t req = UV_FS_REQ_INIT(); debuglog("[%p] start[%s]\n", uv_fs_get_data(&req), str); - uv_fs_unlink(runtime_uvio(), &req, str, wasm_fs_callback); + uv_fs_unlink(get_thread_libuv_handle(), &req, str, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -738,7 +738,7 @@ wasm_fchown(i32 file_descriptor, u32 owner, u32 group) int d = get_current_sandbox_file_descriptor(file_descriptor); uv_fs_t req = UV_FS_REQ_INIT(); debuglog("[%p] start[%d,%d]\n", uv_fs_get_data(&req), file_descriptor, d); - uv_fs_fchown(runtime_uvio(), &req, d, owner, group, wasm_fs_callback); + uv_fs_fchown(get_thread_libuv_handle(), &req, d, owner, group, wasm_fs_callback); block_current_sandbox(); int ret = uv_fs_get_result(&req); @@ -787,12 +787,12 @@ wasm_socket(i32 domain, i32 type, i32 protocol) if (type & SOCK_DGRAM) { uv_udp_t *uh = (uv_udp_t *)h; uh->data = c; - uv_udp_init(runtime_uvio(), uh); + uv_udp_init(get_thread_libuv_handle(), uh); debuglog(" udp init done!\n"); } else if (type & SOCK_STREAM) { uv_tcp_t *uh = (uv_tcp_t *)h; uh->data = c; - uv_tcp_init(runtime_uvio(), uh); + uv_tcp_init(get_thread_libuv_handle(), uh); debuglog(" tcp init done!\n"); } else { assert(0); // not supported yet! @@ -832,7 +832,7 @@ wasm_connect(i32 sockfd, i32 sockaddr_offset, i32 addrlen) i32 wasm_accept(i32 sockfd, i32 sockaddr_offset, i32 addrlen_offset) { - // what do we do with the sockaddr ???? + // what do we do with the sockaddr TODO: ???? socklen_t * addrlen = get_memory_ptr_void(addrlen_offset, sizeof(socklen_t)); struct sockaddr * socket_address = get_memory_ptr_void(sockaddr_offset, *addrlen); union uv_any_handle *s = get_current_sandbox_libuv_handle(sockfd); @@ -844,7 +844,7 @@ wasm_accept(i32 sockfd, i32 sockaddr_offset, i32 addrlen_offset) // assert so we can look into whether we need to implement UDP or others.. assert(((uv_handle_t *)s)->type == UV_TCP); union uv_any_handle *h = get_current_sandbox_libuv_handle(cfd); - uv_tcp_init(runtime_uvio(), (uv_tcp_t *)h); + uv_tcp_init(get_thread_libuv_handle(), (uv_tcp_t *)h); debuglog("[%p] tcp init %d\n", c, cfd); int r = uv_accept((uv_stream_t *)s, (uv_stream_t *)h); if (r < 0) return r; diff --git a/runtime/src/main.c b/runtime/src/main.c index a8c9c9d..4e5b5ee 100644 --- a/runtime/src/main.c +++ b/runtime/src/main.c @@ -125,7 +125,7 @@ void start_worker_threads() { for (int i = 0; i < total_worker_processors; i++) { - int ret = pthread_create(&worker_threads[i], NULL, sandbox_worker_main, + int ret = pthread_create(&worker_threads[i], NULL, worker_thread_main, (void *)&worker_threads_argument[i]); if (ret) { errno = ret; diff --git a/runtime/src/module.c b/runtime/src/module.c index 5eb73c3..0f98579 100644 --- a/runtime/src/module.c +++ b/runtime/src/module.c @@ -6,6 +6,10 @@ #include #include +/*************************************** + * Module Database + ***************************************/ + // In-memory representation of all active modules static struct module *__mod_db[MOD_MAX] = { NULL }; // TODO: What is this? @@ -17,7 +21,7 @@ static int __mod_free_off = 0; * @return module or NULL if no match found **/ struct module * -module_find_by_name(char *name) +find_module_by_name(char *name) { int f = __mod_free_off; for (int i = 0; i < f; i++) { @@ -33,7 +37,7 @@ module_find_by_name(char *name) * @return module or NULL if no match found **/ struct module * -module_find_by_socket_descriptor(int socket_descriptor) +find_module_by_socket_descriptor(int socket_descriptor) { int f = __mod_free_off; for (int i = 0; i < f; i++) { @@ -49,7 +53,7 @@ module_find_by_socket_descriptor(int socket_descriptor) * @return 0 on success. Aborts program on failure **/ static inline int -module_add(struct module *module) +add_module(struct module *module) { assert(module->socket_descriptor == -1); @@ -61,12 +65,16 @@ module_add(struct module *module) return 0; } +/*************************************** + * Module "Methods" + ***************************************/ + /** * Start the module as a server listening at module->port * @param module **/ static inline void -module_server_init(struct module *module) +module__initialize_as_server(struct module *module) { // Allocate a new socket int socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); @@ -102,7 +110,30 @@ module_server_init(struct module *module) } /** - * Module Mega Setup Function + * Module Mega Teardown Function + * Closes the socket and dynamic library, and then frees the module + * Returns harmlessly if there are outstanding references + * @param module - the module to teardown + **/ +void +module__free(struct module *module) +{ + if (module == NULL) return; + if (module->dynamic_library_handle == NULL) return; + + // Do not free if we still have oustanding references + if (module->reference_count) return; + + // TODO: What about the module database? Do we need to do any cleanup there? + + close(module->socket_descriptor); + dlclose(module->dynamic_library_handle); + free(module); +} + + +/** + * Module Contructor * Creates a new module, invokes initialize_tables to initialize the indirect table, adds it to the module DB, and starts *listening for HTTP Requests * @@ -117,7 +148,7 @@ module_server_init(struct module *module) * @returns A new module or NULL in case of failure **/ struct module * -module_alloc(char *name, char *path, i32 argument_count, u32 stack_size, u32 max_memory, u32 timeout, int port, +module__new(char *name, char *path, i32 argument_count, u32 stack_size, u32 max_memory, u32 timeout, int port, int request_size, int response_size) { struct module *module = (struct module *)malloc(sizeof(struct module)); @@ -167,21 +198,21 @@ module_alloc(char *name, char *path, i32 argument_count, u32 stack_size, u32 max // assumption: All modules are created at program start before we enable preemption or enable the execution of // any worker threads We are checking that thread-local module_indirect_table is NULL to prove that we aren't - // yet preempting If we want to be able to do this later, we can possibly defer module_table_init until the + // yet preempting If we want to be able to do this later, we can possibly defer module__initialize_table until the // first invocation assert(cache_tbl == NULL); // TODO: determine why we have to set the module_indirect_table state before calling table init and then restore // the existing value What is the relationship between these things? module_indirect_table = module->indirect_table; - module_table_init(module); + module__initialize_table(module); module_indirect_table = cache_tbl; // Add the module to the in-memory module DB - module_add(module); + add_module(module); // Start listening for requests - module_server_init(module); + module__initialize_as_server(module); return module; @@ -194,24 +225,3 @@ dl_open_error: return NULL; } -/** - * Module Mega Teardown Function - * Closes the socket and dynamic library, and then frees the module - * Returns harmlessly if there are outstanding references - * @param module - the module to teardown - **/ -void -module_free(struct module *module) -{ - if (module == NULL) return; - if (module->dynamic_library_handle == NULL) return; - - // Do not free if we still have oustanding references - if (module->reference_count) return; - - // TODO: What about the module database? Do we need to do any cleanup there? - - close(module->socket_descriptor); - dlclose(module->dynamic_library_handle); - free(module); -} diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index 683a9ef..0fbcbf2 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -19,8 +19,117 @@ struct deque_sandbox *global_deque; pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; int epoll_file_descriptor; +/****************************************** + * Shared Process / Listener Thread Logic * + ******************************************/ + +/** + * Initialize runtime global state, mask signals, and init http parser + */ +void +initialize_runtime(void) +{ + epoll_file_descriptor = epoll_create1(0); + assert(epoll_file_descriptor >= 0); + + // Allocate and Initialize the global deque + global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); + assert(global_deque); + // Note: Below is a Macro + deque_init_sandbox(global_deque, SBOX_MAX_REQS); + + // Mask Signals + softint_mask(SIGUSR1); + softint_mask(SIGALRM); + + // Initialize http-parser + http_init(); +} + +/******************************** + * Listener Thread Logic * + ********************************/ + +/** + * @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the + * sandbox object to the global dequeue + * @param dummy data pointer provided by pthreads API. Unused in this function + * @return NULL + * + * Used Globals: + * epoll_file_descriptor - the epoll file descriptor + * + */ +void * +listener_thread_main(void *dummy) +{ + struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); + int total_requests = 0; + + while (true) { + int request_count = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1); + u64 start_time = rdtsc(); + for (int i = 0; i < request_count; i++) { + if (epoll_events[i].events & EPOLLERR) { + perror("epoll_wait"); + assert(0); + } + + struct sockaddr_in client_address; + socklen_t client_length = sizeof(client_address); + struct module * module = (struct module *)epoll_events[i].data.ptr; + assert(module); + int es = module->socket_descriptor; + int socket_descriptor = accept(es, (struct sockaddr *)&client_address, &client_length); + if (socket_descriptor < 0) { + perror("accept"); + assert(0); + } + total_requests++; + printf("Received Request %d at %lu\n", total_requests, start_time); + + sandbox_request_t *sandbox_request = allocate_sandbox_request( + module, + module->name, + socket_descriptor, + (const struct sockaddr *)&client_address, + start_time); + assert(sandbox_request); + + // TODO: Refactor allocate_sandbox_request to not add to global request queue and do this here + } + } + + free(epoll_events); + + return NULL; +} + +/** + * Initializes the listener thread, pinned to core 0, and starts to listen for requests + */ +void +initialize_listener_thread(void) +{ + cpu_set_t cs; + + CPU_ZERO(&cs); + CPU_SET(MOD_REQ_CORE, &cs); + + pthread_t listener_thread; + int ret = pthread_create(&listener_thread, NULL, listener_thread_main, NULL); + assert(ret == 0); + ret = pthread_setaffinity_np(listener_thread, sizeof(cpu_set_t), &cs); + assert(ret == 0); + ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs); + assert(ret == 0); + + softint_init(); + softint_timer_arm(); +} + /*************************** - * Thread Local State * + * Worker Thread State * **************************/ __thread static struct ps_list_head local_run_queue; @@ -42,8 +151,84 @@ __thread uv_loop_t uvio_handle; // Flag to signify if the thread is currently running callbacks in the libuv event loop static __thread unsigned int in_callback; + +/************************************************** + * Worker Thread Logic + *************************************************/ + static inline void add_sandbox_to_local_run_queue(struct sandbox *sandbox); +/** + * If this sandbox is blocked, mark it as runnable and add to the head of the thread-local runqueue + * @param sandbox the sandbox to check and update if blocked + **/ +void +wakeup_sandbox(sandbox_t *sandbox) +{ + softint_disable(); + debuglog("[%p: %s]\n", sandbox, sandbox->module->name); + if (sandbox->state != BLOCKED) goto done; + assert(sandbox->state == BLOCKED); + assert(ps_list_singleton_d(sandbox)); + sandbox->state = RUNNABLE; + ps_list_head_append_d(&local_run_queue, sandbox); +done: + softint_enable(); +} + + +/** + * Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head of the runqueue + **/ +void +block_current_sandbox(void) +{ + assert(in_callback == 0); + softint_disable(); + struct sandbox *current_sandbox = get_current_sandbox(); + // TODO: What is this getting removed from again? the thread-local runqueue? + ps_list_rem_d(current_sandbox); + current_sandbox->state = BLOCKED; + struct sandbox *next_sandbox = get_next_sandbox_from_local_run_queue(0); + debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); + softint_enable(); + switch_to_sandbox(next_sandbox); +} + + +/** + * TODO: What is this doing? + **/ +void +sandbox_block_http(void) +{ +#ifdef USE_HTTP_UVIO +#ifdef USE_HTTP_SYNC + // realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, not + // great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do + // async block! + uv_run(get_thread_libuv_handle(), UV_RUN_DEFAULT); +#else /* USE_HTTP_SYNC */ + block_current_sandbox(); +#endif /* USE_HTTP_UVIO */ +#else + assert(0); + // it should not be called if not using uvio for http +#endif +} + +/** + * TODO: What is this doing? + **/ +void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void) +{ + pthread_kill(pthread_self(), SIGUSR1); + + assert(0); // should not get here.. + while (true) + ; +} + /** * Pulls up to 1..n sandbox requests, allocates them as sandboxes, sets them as runnable and places them on the local * runqueue, and then frees the sandbox requests The batch size pulled at once is set by SBOX_PULL_MAX @@ -73,16 +258,16 @@ pull_sandbox_requests_from_global_runqueue(void) } /** - * Run all outstanding events in the libuv event loop + * Run all outstanding events in the local thread's libuv event loop **/ void execute_libuv_event_loop(void) { in_callback = 1; - int n = uv_run(runtime_uvio(), UV_RUN_NOWAIT), i = 0; + int n = uv_run(get_thread_libuv_handle(), UV_RUN_NOWAIT), i = 0; while (n > 0) { n--; - uv_run(runtime_uvio(), UV_RUN_NOWAIT); + uv_run(get_thread_libuv_handle(), UV_RUN_NOWAIT); } in_callback = 0; } @@ -171,85 +356,13 @@ free_sandboxes_from_completion_queue(unsigned int number_to_free) } } -/** - * If this sandbox is blocked, mark it as runnable and add to the head of the thread-local runqueue - * @param sandbox the sandbox to check and update if blocked - **/ -void -wakeup_sandbox(sandbox_t *sandbox) -{ - softint_disable(); - debuglog("[%p: %s]\n", sandbox, sandbox->module->name); - if (sandbox->state != BLOCKED) goto done; - assert(sandbox->state == BLOCKED); - assert(ps_list_singleton_d(sandbox)); - sandbox->state = RUNNABLE; - ps_list_head_append_d(&local_run_queue, sandbox); -done: - softint_enable(); -} - - -/** - * Mark the currently executing sandbox as blocked, remove it from the local runqueue, and pull the sandbox at the head of the runqueue - **/ -void -block_current_sandbox(void) -{ - assert(in_callback == 0); - softint_disable(); - struct sandbox *current_sandbox = get_current_sandbox(); - // TODO: What is this getting removed from again? the thread-local runqueue? - ps_list_rem_d(current_sandbox); - current_sandbox->state = BLOCKED; - struct sandbox *next_sandbox = get_next_sandbox_from_local_run_queue(0); - debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", current_sandbox, current_sandbox->module->name, next_sandbox, next_sandbox ? next_sandbox->module->name : ""); - softint_enable(); - switch_to_sandbox(next_sandbox); -} - - -/** - * TODO: What is this doing? - **/ -void -sandbox_block_http(void) -{ -#ifdef USE_HTTP_UVIO -#ifdef USE_HTTP_SYNC - // realistically, we're processing all async I/O on this core when a sandbox blocks on http processing, not - // great! if there is a way (TODO), perhaps RUN_ONCE and check if your I/O is processed, if yes, return else do - // async block! - uv_run(runtime_uvio(), UV_RUN_DEFAULT); -#else /* USE_HTTP_SYNC */ - block_current_sandbox(); -#endif /* USE_HTTP_UVIO */ -#else - assert(0); - // it should not be called if not using uvio for http -#endif -} - - -/** - * TODO: What is this doing? - **/ -void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(void) -{ - pthread_kill(pthread_self(), SIGUSR1); - - assert(0); // should not get here.. - while (true) - ; -} - /** * Tries to free a completed request, executes libuv callbacks, and then gets * and returns the standbox at the head of the thread-local runqueue * @return sandbox or NULL **/ struct sandbox * -sandbox_worker_single_loop(void) +worker_thread_single_loop(void) { assert(get_current_sandbox() == NULL); // Try to free one sandbox from the completion queue @@ -271,7 +384,7 @@ sandbox_worker_single_loop(void) * @param return_code - argument provided by pthread API. We set to -1 on error **/ void * -sandbox_worker_main(void *return_code) +worker_thread_main(void *return_code) { arch_context_init(&base_context, 0, 0); @@ -287,10 +400,10 @@ sandbox_worker_main(void *return_code) in_callback = 0; while (true) { - struct sandbox *sandbox = sandbox_worker_single_loop(); + struct sandbox *sandbox = worker_thread_single_loop(); while (sandbox) { switch_to_sandbox(sandbox); - sandbox = sandbox_worker_single_loop(); + sandbox = worker_thread_single_loop(); } } @@ -298,11 +411,14 @@ sandbox_worker_main(void *return_code) pthread_exit(return_code); } +// RANDOM... Is this in the right place? + /** * Called when the function in the sandbox exits * Removes the standbox from the thread-local runqueue, sets its state to RETURNED, * releases the linear memory, and then switches to the sandbox at the head of the runqueue * TODO: Why are we not adding to the completion queue here? That logic is commented out. + * TODO: Does this belong in sandbox.c? **/ void exit_current_sandbox(void) @@ -321,105 +437,4 @@ exit_current_sandbox(void) munmap(current_sandbox->linear_memory_start, SBOX_MAX_MEM + PAGE_SIZE); // add_sandbox_to_completion_queue(current_sandbox); switch_to_sandbox(next_sandbox); -} - -/** - * @brief Execution Loop of the listener core, handles HTTP requests, allocates sandbox request objects, and pushes the - * sandbox object to the global dequeue - * @param dummy data pointer provided by pthreads API. Unused in this function - * @return NULL - * - * Used Globals: - * epoll_file_descriptor - the epoll file descriptor - * - */ -void * -listener_thread_main(void *dummy) -{ - struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); - int total_requests = 0; - - while (true) { - int request_count = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1); - u64 start_time = rdtsc(); - for (int i = 0; i < request_count; i++) { - if (epoll_events[i].events & EPOLLERR) { - perror("epoll_wait"); - assert(0); - } - - struct sockaddr_in client_address; - socklen_t client_length = sizeof(client_address); - struct module * module = (struct module *)epoll_events[i].data.ptr; - assert(module); - int es = module->socket_descriptor; - int socket_descriptor = accept(es, (struct sockaddr *)&client_address, &client_length); - if (socket_descriptor < 0) { - perror("accept"); - assert(0); - } - total_requests++; - printf("Received Request %d at %lu\n", total_requests, start_time); - - sandbox_request_t *sandbox_request = allocate_sandbox_request( - module, - module->name, - socket_descriptor, - (const struct sockaddr *)&client_address, - start_time); - assert(sandbox_request); - - // TODO: Refactor allocate_sandbox_request to not add to global request queue and do this here - } - } - - free(epoll_events); - - return NULL; -} - -/** - * Initialize runtime global state, mask signals, and init http parser - */ -void -initialize_runtime(void) -{ - epoll_file_descriptor = epoll_create1(0); - assert(epoll_file_descriptor >= 0); - - // Allocate and Initialize the global deque - global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); - assert(global_deque); - // Note: Below is a Macro - deque_init_sandbox(global_deque, SBOX_MAX_REQS); - - // Mask Signals - softint_mask(SIGUSR1); - softint_mask(SIGALRM); - - // Initialize http-parser - http_init(); -} - -/** - * Initializes the listener thread, pinned to core 0, and starts to listen for requests - */ -void -initialize_listener_thread(void) -{ - cpu_set_t cs; - - CPU_ZERO(&cs); - CPU_SET(MOD_REQ_CORE, &cs); - - pthread_t listener_thread; - int ret = pthread_create(&listener_thread, NULL, listener_thread_main, NULL); - assert(ret == 0); - ret = pthread_setaffinity_np(listener_thread, sizeof(cpu_set_t), &cs); - assert(ret == 0); - ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs); - assert(ret == 0); - - softint_init(); - softint_timer_arm(); -} +} \ No newline at end of file diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index 0d5b944..a38a61e 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -127,7 +127,7 @@ allocate_sandbox_memory(struct module *module) sandbox->linear_memory_size = linear_memory_size; sandbox->module = module; sandbox->sandbox_size = sandbox_size; - module_acquire(module); + module__acquire(module); return sandbox; } @@ -284,7 +284,7 @@ sandbox_main(void) softint_enable(); } struct module *current_module = get_sandbox_module(current_sandbox); - int argument_count = module_argument_count(current_module); + int argument_count = module__get_argument_count(current_module); // for stdio // Try to initialize file descriptors 0, 1, and 2 as io handles 0, 1, 2 @@ -309,7 +309,7 @@ sandbox_main(void) #ifdef USE_HTTP_UVIO // Initialize libuv TCP stream - int r = uv_tcp_init(runtime_uvio(), (uv_tcp_t *)¤t_sandbox->client_libuv_stream); + int r = uv_tcp_init(get_thread_libuv_handle(), (uv_tcp_t *)¤t_sandbox->client_libuv_stream); assert(r == 0); // Set the current sandbox as the data the libuv callbacks have access to @@ -329,15 +329,15 @@ sandbox_main(void) // Allocate the WebAssembly Sandbox alloc_linear_memory(); // perhaps only initialized for the first instance? or TODO! - // module_table_init(current_module); - module_globals_init(current_module); - module_memory_init(current_module); + // module__initialize_table(current_module); + module__initialize_globals(current_module); + module__initialize_memory(current_module); // Copy the arguments into the WebAssembly sandbox setup_sandbox_arguments(argument_count); // Executing the function within the WebAssembly sandbox - current_sandbox->return_value = module_entry(current_module, argument_count, current_sandbox->arguments_offset); + current_sandbox->return_value = module__main(current_module, argument_count, current_sandbox->arguments_offset); // Retrieve the result from the WebAssembly sandbox, construct the HTTP response, and send to client build_and_send_current_sandbox_client_response(); @@ -357,7 +357,7 @@ sandbox_main(void) struct sandbox * allocate_sandbox(struct module *module, char *arguments, int socket_descriptor, const struct sockaddr *socket_address, u64 start_time) { - if (!module_is_valid(module)) return NULL; + if (!module__is_valid(module)) return NULL; // FIXME: don't use malloc. huge security problem! // perhaps, main should be in its own sandbox, when it is not running any sandbox. @@ -401,7 +401,7 @@ free_sandbox(struct sandbox *sandbox) int sz = sizeof(struct sandbox); sz += sandbox->module->max_request_or_response_size; - module_release(sandbox->module); + module__release(sandbox->module); // TODO free(sandbox->arguments); void * stkaddr = sandbox->stack_start; diff --git a/runtime/src/softint.c b/runtime/src/softint.c index d845fe5..2993e81 100644 --- a/runtime/src/softint.c +++ b/runtime/src/softint.c @@ -102,7 +102,7 @@ extern pthread_t worker_threads[]; /** * The handler function for Software Interrupts (signals) * SIGALRM is executed periodically by an interval timer, causing preemption of the current sandbox - * SIGUSR1 does ?????? + * SIGUSR1 does TODO: ???TODO: ??? * @param signal_type * @param signal_info data structure containing signal info * @param user_context_raw void* to a user_context struct diff --git a/runtime/src/util.c b/runtime/src/util.c index 780d46a..77704f7 100644 --- a/runtime/src/util.c +++ b/runtime/src/util.c @@ -162,10 +162,10 @@ util_parse_modules_file_json(char *file_name) if (is_active == 0) continue; // Allocate a module based on the values from the JSON - struct module *module = module_alloc(module_name, module_path, argument_count, 0, 0, 0, port, + struct module *module = module__new(module_name, module_path, argument_count, 0, 0, 0, port, request_size, response_size); assert(module); - module_http_info(module, request_count, request_headers, request_content_type, response_count, + module__set_http_info(module, request_count, request_headers, request_content_type, response_count, reponse_headers, response_content_type); module_count++; free(request_headers);