diff --git a/runtime/include/admissions_control.h b/runtime/include/admissions_control.h index b33e896..b71f13a 100644 --- a/runtime/include/admissions_control.h +++ b/runtime/include/admissions_control.h @@ -1,125 +1,14 @@ #pragma once -#include #include #include -#include - -#include "debuglog.h" -#include "client_socket.h" #define ADMISSIONS_CONTROL_GRANULARITY 1000000 -/* - * Unitless estimate of the instantaneous fraction of system capacity required to complete all previously - * admitted work. This is used to calculate free capacity as part of admissions control - * - * The estimated requirements of a single admitted request is calculated as - * estimated execution time (cycles) / relative deadline (cycles) - * - * These estimates are incremented on request acceptance and decremented on request completion (either - * success or failure) - */ -extern _Atomic uint64_t admissions_control_admitted; -extern uint64_t admissions_control_capacity; -extern const double admissions_control_overhead; - -static inline void -admissions_control_initialize() -{ -#ifdef ADMISSIONS_CONTROL - atomic_init(&admissions_control_admitted, 0); - admissions_control_capacity = runtime_worker_threads_count * ADMISSIONS_CONTROL_GRANULARITY - * ((double)1.0 - admissions_control_overhead); -#endif -} - -static inline void -admissions_control_add(uint64_t admissions_estimate) -{ -#ifdef ADMISSIONS_CONTROL - assert(admissions_estimate > 0); - atomic_fetch_add(&admissions_control_admitted, admissions_estimate); - -#ifdef LOG_ADMISSIONS_CONTROL - debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity); -#endif - -#endif /* ADMISSIONS_CONTROL */ -} - -static inline void -admissions_control_subtract(uint64_t admissions_estimate) -{ -#ifdef ADMISSIONS_CONTROL - /* Assumption: Should never underflow */ - if (unlikely(admissions_estimate > admissions_control_admitted)) panic("Admissions Estimate underflow\n"); - - atomic_fetch_sub(&admissions_control_admitted, admissions_estimate); - -#ifdef LOG_ADMISSIONS_CONTROL - debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity); -#endif - -#endif /* ADMISSIONS_CONTROL */ -} - - -static inline uint64_t -admissions_control_calculate_estimate(uint64_t estimated_execution, uint64_t relative_deadline) -{ -#ifdef ADMISSIONS_CONTROL - assert(relative_deadline != 0); - uint64_t admissions_estimate = (estimated_execution * (uint64_t)ADMISSIONS_CONTROL_GRANULARITY) - / relative_deadline; - if (admissions_estimate == 0) - panic("Ratio of Deadline to Execution time cannot exceed %d\n", ADMISSIONS_CONTROL_GRANULARITY); - - return admissions_estimate; -#else - return 0; -#endif -} - -static inline uint64_t -admissions_control_calculate_estimate_us(uint32_t estimated_execution_us, uint32_t relative_deadline_us) -{ -#ifdef ADMISSIONS_CONTROL - assert(relative_deadline_us != 0); - return (uint64_t)((uint64_t)(estimated_execution_us * ADMISSIONS_CONTROL_GRANULARITY)) / relative_deadline_us; -#else - return 0; -#endif -} - -static inline void -admissions_control_log_decision(uint64_t admissions_estimate, bool admitted) -{ -#ifdef LOG_ADMISSIONS_CONTROL - debuglog("Admitted: %lu, Capacity: %lu, Estimate: %lu, Admitted? %s\n", admissions_control_admitted, - admissions_control_capacity, admissions_estimate, admitted ? "yes" : "no"); -#endif /* LOG_ADMISSIONS_CONTROL */ -} - -static inline uint64_t -admissions_control_decide(uint64_t admissions_estimate) -{ - uint64_t work_admitted = 1; /* Nominal non-zero value in case admissions control is disabled */ - -#ifdef ADMISSIONS_CONTROL - if (unlikely(admissions_estimate == 0)) panic("Admissions estimate should never be zero"); - - uint64_t total_admitted = atomic_load(&admissions_control_admitted); - - if (total_admitted + admissions_estimate >= admissions_control_capacity) { - admissions_control_log_decision(admissions_estimate, false); - work_admitted = 0; - } else { - admissions_control_log_decision(admissions_estimate, true); - admissions_control_add(admissions_estimate); - work_admitted = admissions_estimate; - } -#endif /* ADMISSIONS_CONTROL */ - - return work_admitted; -} +void admissions_control_initialize(); +void admissions_control_add(uint64_t admissions_estimate); +void admissions_control_subtract(uint64_t admissions_estimate); +uint64_t admissions_control_calculate_estimate(uint64_t estimated_execution, uint64_t relative_deadline); +uint64_t admissions_control_calculate_estimate_us(uint32_t estimated_execution_us, uint32_t relative_deadline_us); +void admissions_control_log_decision(uint64_t admissions_estimate, bool admitted); +uint64_t admissions_control_decide(uint64_t admissions_estimate); diff --git a/runtime/include/admissions_info.h b/runtime/include/admissions_info.h index e239236..d1557ba 100644 --- a/runtime/include/admissions_info.h +++ b/runtime/include/admissions_info.h @@ -1,6 +1,5 @@ #pragma once -#include "debuglog.h" #include "perf_window.h" struct admissions_info { @@ -11,51 +10,6 @@ struct admissions_info { uint64_t relative_deadline; /* Relative deadline in cycles. This is duplicated state */ }; -/** - * Initializes perf window - * @param self - */ -static inline void -admissions_info_initialize(struct admissions_info *self, int percentile, uint64_t expected_execution, - uint64_t relative_deadline) -{ -#ifdef ADMISSIONS_CONTROL - assert(relative_deadline > 0); - assert(expected_execution > 0); - self->relative_deadline = relative_deadline; - self->estimate = admissions_control_calculate_estimate(expected_execution, relative_deadline); - debuglog("Initial Estimate: %lu\n", self->estimate); - assert(self != NULL); - - perf_window_initialize(&self->perf_window); - - if (unlikely(percentile < 50 || percentile > 99)) panic("Invalid admissions percentile"); - self->percentile = percentile; - - self->control_index = PERF_WINDOW_BUFFER_SIZE * percentile / 100; -#ifdef LOG_ADMISSIONS_CONTROL - debuglog("Percentile: %d\n", self->percentile); - debuglog("Control Index: %d\n", self->control_index); -#endif -#endif -} - -/* - * Adds an execution value to the perf window and calculates and caches and updated estimate - * @param self - * @param execution_duration - */ -static inline void -admissions_info_update(struct admissions_info *self, uint64_t execution_duration) -{ -#ifdef ADMISSIONS_CONTROL - assert(!software_interrupt_is_enabled()); - struct perf_window *perf_window = &self->perf_window; - - LOCK_LOCK(&self->perf_window.lock); - perf_window_add(perf_window, execution_duration); - uint64_t estimated_execution = perf_window_get_percentile(perf_window, self->percentile, self->control_index); - self->estimate = admissions_control_calculate_estimate(estimated_execution, self->relative_deadline); - LOCK_UNLOCK(&self->perf_window.lock); -#endif -} +void admissions_info_initialize(struct admissions_info *self, int percentile, uint64_t expected_execution, + uint64_t relative_deadline); +void admissions_info_update(struct admissions_info *self, uint64_t execution_duration); diff --git a/runtime/include/client_socket.h b/runtime/include/client_socket.h index bab817a..974d04e 100644 --- a/runtime/include/client_socket.h +++ b/runtime/include/client_socket.h @@ -1,81 +1,7 @@ #pragma once -#include -#include -#include -#include +#include -#include "panic.h" -#include "debuglog.h" -#include "http_response.h" -#include "http_total.h" -#include "runtime.h" -#include "worker_thread.h" +void client_socket_close(int client_socket, struct sockaddr *client_address); - -static inline void -client_socket_close(int client_socket, struct sockaddr *client_address) -{ - /* Should never close 0, 1, or 2 */ - assert(client_socket != STDIN_FILENO); - assert(client_socket != STDOUT_FILENO); - assert(client_socket != STDERR_FILENO); - - - if (unlikely(close(client_socket) < 0)) { - char client_address_text[INET6_ADDRSTRLEN] = {}; - if (unlikely(inet_ntop(AF_INET, &client_address, client_address_text, INET6_ADDRSTRLEN) == NULL)) { - debuglog("Failed to log client_address: %s", strerror(errno)); - } - debuglog("Error closing client socket %d associated with %s - %s", client_socket, client_address_text, - strerror(errno)); - } -} - - -/** - * Rejects request due to admission control or error - * @param client_socket - the client we are rejecting - * @param status_code - either 503 or 400 - */ -static inline int -client_socket_send(int client_socket, int status_code) -{ - const char *response; - int rc; - switch (status_code) { - case 503: - response = HTTP_RESPONSE_503_SERVICE_UNAVAILABLE; - http_total_increment_5XX(); - break; - case 400: - response = HTTP_RESPONSE_400_BAD_REQUEST; - http_total_increment_4XX(); - break; - default: - panic("%d is not a valid status code\n", status_code); - } - - int sent = 0; - int to_send = strlen(response); - - while (sent < to_send) { - rc = write(client_socket, &response[sent], to_send - sent); - if (rc < 0) { - if (errno == EAGAIN) { debuglog("Unexpectedly blocking on write of %s\n", response); } - - debuglog("Error with %s\n", strerror(errno)); - - goto send_err; - } - sent += rc; - }; - - rc = 0; -done: - return rc; -send_err: - debuglog("Error sending to client: %s", strerror(errno)); - rc = -1; - goto done; -} +int client_socket_send(int client_socket, int status_code); diff --git a/runtime/include/current_sandbox.h b/runtime/include/current_sandbox.h index c469cff..668f8ec 100644 --- a/runtime/include/current_sandbox.h +++ b/runtime/include/current_sandbox.h @@ -2,9 +2,5 @@ #include "sandbox.h" -void current_sandbox_close_file_descriptor(int io_handle_index); struct sandbox *current_sandbox_get(void); -int current_sandbox_get_file_descriptor(int io_handle_index); -int current_sandbox_initialize_io_handle(void); void current_sandbox_set(struct sandbox *sandbox); -int current_sandbox_set_file_descriptor(int io_handle_index, int file_descriptor); diff --git a/runtime/include/generic_thread.h b/runtime/include/generic_thread.h index a600e58..135b68e 100644 --- a/runtime/include/generic_thread.h +++ b/runtime/include/generic_thread.h @@ -2,25 +2,8 @@ #include -#include "arch/getcycles.h" -#include "debuglog.h" - extern __thread uint64_t generic_thread_lock_duration; extern __thread uint64_t generic_thread_start_timestamp; -/** - * Reports lock contention - */ -static inline void -generic_thread_dump_lock_overhead() -{ -#ifndef NDEBUG -#ifdef LOG_LOCK_OVERHEAD - uint64_t duration = __getcycles() - generic_thread_start_timestamp; - debuglog("Locks consumed %lu / %lu cycles, or %f%%\n", generic_thread_lock_duration, duration, - (double)generic_thread_lock_duration / duration * 100); -#endif -#endif -} - +void generic_thread_dump_lock_overhead(); void generic_thread_initialize(); diff --git a/runtime/include/http_request.h b/runtime/include/http_request.h index f7d0e12..21e4146 100644 --- a/runtime/include/http_request.h +++ b/runtime/include/http_request.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include "http.h" @@ -28,22 +27,5 @@ struct http_request { bool message_end; /* boolean flag set when body processing is complete */ }; -static inline void -http_request_print(struct http_request *self) -{ - printf("Header Count %d\n", self->header_count); - printf("Header Content:\n"); - for (int i = 0; i < self->header_count; i++) { - for (int j = 0; j < self->headers[i].key_length; j++) { putchar(self->headers[i].key[j]); } - putchar(':'); - for (int j = 0; j < self->headers[i].value_length; j++) { putchar(self->headers[i].value[j]); } - putchar('\n'); - } - printf("Body Length %d\n", self->body_length); - printf("Body Read Length %d\n", self->body_read_length); -} - -/*************************************************** - * General HTTP Request Functions * - **************************************************/ -int http_request_get_body(struct http_request *http_request, char **body); +int http_request_get_body(struct http_request *http_request, char **body); +void http_request_print(struct http_request *self); diff --git a/runtime/include/http_total.h b/runtime/include/http_total.h index 2f74cad..d7ee1cb 100644 --- a/runtime/include/http_total.h +++ b/runtime/include/http_total.h @@ -17,41 +17,8 @@ extern _Atomic uint32_t http_total_2XX; extern _Atomic uint32_t http_total_4XX; #endif -static inline void -http_total_init() -{ - atomic_init(&http_total_requests, 0); - atomic_init(&http_total_5XX, 0); -#ifdef LOG_TOTAL_REQS_RESPS - atomic_init(&http_total_2XX, 0); - atomic_init(&http_total_4XX, 0); -#endif -} - -static inline void -http_total_increment_request() -{ - atomic_fetch_add(&http_total_requests, 1); -} - -static inline void -http_total_increment_2xx() -{ -#ifdef LOG_TOTAL_REQS_RESPS - atomic_fetch_add(&http_total_2XX, 1); -#endif -} - -static inline void -http_total_increment_4XX() -{ -#ifdef LOG_TOTAL_REQS_RESPS - atomic_fetch_add(&http_total_4XX, 1); -#endif -} - -static inline void -http_total_increment_5XX() -{ - atomic_fetch_add(&http_total_5XX, 1); -} +void http_total_init(); +void http_total_increment_request(); +void http_total_increment_2xx(); +void http_total_increment_4XX(); +void http_total_increment_5XX(); diff --git a/runtime/include/listener_thread.h b/runtime/include/listener_thread.h index 26d5962..a00a4f4 100644 --- a/runtime/include/listener_thread.h +++ b/runtime/include/listener_thread.h @@ -1,10 +1,24 @@ #pragma once +#include + #include "generic_thread.h" #include "module.h" #define LISTENER_THREAD_CORE_ID 0 +extern pthread_t listener_thread_id; + void listener_thread_initialize(void); __attribute__((noreturn)) void *listener_thread_main(void *dummy); int listener_thread_register_module(struct module *mod); + +/** + * Used to determine if running in the context of a listener thread + * @returns true if listener. false if not (probably a worker) + */ +static inline bool +listener_thread_is_running() +{ + return pthread_self() == listener_thread_id; +} diff --git a/runtime/include/local_runqueue.h b/runtime/include/local_runqueue.h index f0714dc..abbf840 100644 --- a/runtime/include/local_runqueue.h +++ b/runtime/include/local_runqueue.h @@ -19,11 +19,9 @@ struct local_runqueue_config { local_runqueue_preempt_fn_t preempt_fn; }; -void local_runqueue_initialize(struct local_runqueue_config *config); - -/* This is currently only used by worker_thread_wakeup_sandbox */ void local_runqueue_add(struct sandbox *); void local_runqueue_delete(struct sandbox *); bool local_runqueue_is_empty(); struct sandbox *local_runqueue_get_next(); +void local_runqueue_initialize(struct local_runqueue_config *config); void local_runqueue_preempt(ucontext_t *); diff --git a/runtime/include/lock.h b/runtime/include/lock.h index d21677a..608d924 100644 --- a/runtime/include/lock.h +++ b/runtime/include/lock.h @@ -28,7 +28,7 @@ typedef ck_spinlock_mcs_t lock_t; * @param unique_variable_name - a unique prefix to hygienically namespace an associated lock/unlock pair */ #define LOCK_LOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \ - assert(!runtime_is_worker() || !software_interrupt_is_enabled()); \ + assert(listener_thread_is_running() || !software_interrupt_is_enabled()); \ struct ck_spinlock_mcs _hygiene_##unique_variable_name##_node; \ uint64_t _hygiene_##unique_variable_name##_pre = __getcycles(); \ ck_spinlock_mcs_lock((lock), &(_hygiene_##unique_variable_name##_node)); \ @@ -39,8 +39,8 @@ typedef ck_spinlock_mcs_t lock_t; * @param lock - the address of the lock * @param unique_variable_name - a unique prefix to hygienically namespace an associated lock/unlock pair */ -#define LOCK_UNLOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \ - assert(!runtime_is_worker() || !software_interrupt_is_enabled()); \ +#define LOCK_UNLOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \ + assert(listener_thread_is_running() || !software_interrupt_is_enabled()); \ ck_spinlock_mcs_unlock(lock, &(_hygiene_##unique_variable_name##_node)); /** diff --git a/runtime/include/module.h b/runtime/include/module.h index 87cd934..69c7ee7 100644 --- a/runtime/include/module.h +++ b/runtime/include/module.h @@ -207,29 +207,6 @@ module_release(struct module *module) return; } -/** - * Sets the HTTP Request and Response Headers and Content type on a module - * @param module - * @param request_count - * @param request_headers - * @param request_content_type - * @param response_count - * @param response_headers - * @param response_content_type - */ -static inline void -module_set_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[], - int response_count, char *response_headers, char response_content_type[]) -{ - assert(module); - module->request_header_count = request_count; - memcpy(module->request_headers, request_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT); - strcpy(module->request_content_type, request_content_type); - module->response_header_count = response_count; - memcpy(module->response_headers, response_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT); - strcpy(module->response_content_type, response_content_type); -} - /******************************** * Public Methods from module.c * *******************************/ diff --git a/runtime/include/module_database.h b/runtime/include/module_database.h index e973399..7b61345 100644 --- a/runtime/include/module_database.h +++ b/runtime/include/module_database.h @@ -1,36 +1,7 @@ #pragma once -#include - -#include "panic.h" #include "module.h" +int module_database_add(struct module *module); struct module *module_database_find_by_name(char *name); struct module *module_database_find_by_socket_descriptor(int socket_descriptor); - -extern struct module *module_database[]; -extern size_t module_database_count; - -/** - * Adds a module to the in-memory module DB - * @param module module to add - * @return 0 on success. -ENOSPC when full - */ -static inline int -module_database_add(struct module *module) -{ - assert(module_database_count <= MODULE_MAX_MODULE_COUNT); - - int rc; - - if (module_database_count == MODULE_MAX_MODULE_COUNT) goto err_no_space; - module_database[module_database_count++] = module; - - rc = 0; -done: - return rc; -err_no_space: - panic("Cannot add module. Database is full.\n"); - rc = -ENOSPC; - goto done; -} diff --git a/runtime/include/runtime.h b/runtime/include/runtime.h index 4253faa..81cfbd2 100644 --- a/runtime/include/runtime.h +++ b/runtime/include/runtime.h @@ -71,21 +71,6 @@ extern void runtime_initialize(void); extern void runtime_set_resource_limits_to_max(); extern void stub_init(int32_t offset); -/** - * Used to determine if running in the context of a worker thread - * @returns true if worker. false if listener core - */ -static inline bool -runtime_is_worker() -{ - pthread_t self = pthread_self(); - for (int i = 0; i < runtime_worker_threads_count; i++) { - if (runtime_worker_threads[i] == self) return true; - } - - return false; -} - static inline char * runtime_print_scheduler(enum RUNTIME_SCHEDULER variant) { diff --git a/runtime/include/sandbox.h b/runtime/include/sandbox.h index fbefc5f..ec2330f 100644 --- a/runtime/include/sandbox.h +++ b/runtime/include/sandbox.h @@ -103,15 +103,6 @@ struct sandbox { char request_response_data[1]; /* of request_response_data_length, following sandbox mem.. */ } PAGE_ALIGNED; -/*************************** - * Externs * - **************************/ - -extern void worker_thread_block_current_sandbox(void); -extern void worker_thread_on_sandbox_exit(struct sandbox *sandbox); -extern void worker_thread_process_io(void); -extern void worker_thread_wakeup_sandbox(struct sandbox *sandbox); - /*************************** * Public API * **************************/ @@ -120,7 +111,6 @@ struct sandbox *sandbox_allocate(struct sandbox_request *sandbox_request); void sandbox_free(struct sandbox *sandbox); void sandbox_free_linear_memory(struct sandbox *sandbox); void sandbox_main(struct sandbox *sandbox); -size_t sandbox_parse_http_request(struct sandbox *sandbox, size_t length); /** @@ -165,26 +155,6 @@ sandbox_initialize_io_handle(struct sandbox *sandbox) return io_handle_index; } - -/** - * Initializes and returns an IO handle of a sandbox ready for use - * @param sandbox - * @param file_descriptor what we'll set on the IO handle after initialization - * @return index of handle we preopened or -1 if all io_handles are exhausted - */ -static inline int -sandbox_initialize_io_handle_and_set_file_descriptor(struct sandbox *sandbox, int file_descriptor) -{ - if (!sandbox) return -1; - if (file_descriptor < 0) return file_descriptor; - int io_handle_index = sandbox_initialize_io_handle(sandbox); - if (io_handle_index != -1) { - sandbox->io_handles[io_handle_index].file_descriptor = - file_descriptor; /* per sandbox, so synchronization necessary! */ - } - return io_handle_index; -} - /** * Sets the file descriptor of the sandbox's ith io_handle * Returns error condition if the file_descriptor to set does not contain sandbox preopen magic @@ -232,74 +202,9 @@ sandbox_close_file_descriptor(struct sandbox *sandbox, int io_handle_index) sandbox->io_handles[io_handle_index].file_descriptor = -1; } -/** - * Prints key performance metrics for a sandbox to runtime_sandbox_perf_log - * This is defined by an environment variable - * @param sandbox - */ -static inline void -sandbox_print_perf(struct sandbox *sandbox) -{ - /* If the log was not defined by an environment variable, early out */ - if (runtime_sandbox_perf_log == NULL) return; - - uint32_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz; - uint32_t queued_us = (sandbox->allocation_timestamp - sandbox->request_arrival_timestamp) - / runtime_processor_speed_MHz; - uint32_t initializing_us = sandbox->initializing_duration / runtime_processor_speed_MHz; - uint32_t runnable_us = sandbox->runnable_duration / runtime_processor_speed_MHz; - uint32_t running_us = sandbox->running_duration / runtime_processor_speed_MHz; - uint32_t blocked_us = sandbox->blocked_duration / runtime_processor_speed_MHz; - uint32_t returned_us = sandbox->returned_duration / runtime_processor_speed_MHz; - - /* - * Assumption: A sandbox is never able to free pages. If linear memory management - * becomes more intelligent, then peak linear memory size needs to be tracked - * seperately from current linear memory size. - */ - fprintf(runtime_sandbox_perf_log, "%lu,%s():%d,%s,%u,%u,%u,%u,%u,%u,%u,%u,%u\n", sandbox->id, - sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state), - sandbox->module->relative_deadline_us, total_time_us, queued_us, initializing_us, runnable_us, - running_us, blocked_us, returned_us, sandbox->linear_memory_size); -} - -static inline void -sandbox_summarize_page_allocations(struct sandbox *sandbox) -{ -#ifdef LOG_SANDBOX_MEMORY_PROFILE - // TODO: Handle interleavings - char sandbox_page_allocations_log_path[100] = {}; - sandbox_page_allocations_log_path[99] = '\0'; - snprintf(sandbox_page_allocations_log_path, 99, "%s_%d_page_allocations.csv", sandbox->module->name, - sandbox->module->port); - - debuglog("Logging to %s", sandbox_page_allocations_log_path); - - FILE *sandbox_page_allocations_log = fopen(sandbox_page_allocations_log_path, "a"); - - fprintf(sandbox_page_allocations_log, "%lu,%lu,%s,", sandbox->id, sandbox->running_duration, - sandbox_state_stringify(sandbox->state)); - for (size_t i = 0; i < sandbox->page_allocation_timestamps_size; i++) - fprintf(sandbox_page_allocations_log, "%u,", sandbox->page_allocation_timestamps[i]); - - fprintf(sandbox_page_allocations_log, "\n"); -#else - return; -#endif -} - - -static inline void -sandbox_close_http(struct sandbox *sandbox) -{ - assert(sandbox != NULL); - - int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, sandbox->client_socket_descriptor, NULL); - if (unlikely(rc < 0)) panic_err(); - - client_socket_close(sandbox->client_socket_descriptor, &sandbox->client_address); -} - +void sandbox_close_http(struct sandbox *sandbox); +void sandbox_print_perf(struct sandbox *sandbox); +void sandbox_summarize_page_allocations(struct sandbox *sandbox); INLINE void sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sandbox_request, uint64_t allocation_timestamp); diff --git a/runtime/include/worker_thread.h b/runtime/include/worker_thread.h index 99beb47..92f064f 100644 --- a/runtime/include/worker_thread.h +++ b/runtime/include/worker_thread.h @@ -47,3 +47,6 @@ worker_thread_get_memory_string(uint32_t offset, uint32_t max_length) } return NULL; } + +void worker_thread_block_current_sandbox(void); +__attribute__((noreturn)) void worker_thread_on_sandbox_exit(); diff --git a/runtime/src/admissions_control.c b/runtime/src/admissions_control.c index 275bee3..90f9705 100644 --- a/runtime/src/admissions_control.c +++ b/runtime/src/admissions_control.c @@ -1,6 +1,120 @@ +#include +#include + #include "admissions_control.h" +#include "debuglog.h" +#include "client_socket.h" +/* + * Unitless estimate of the instantaneous fraction of system capacity required to complete all previously + * admitted work. This is used to calculate free capacity as part of admissions control + * + * The estimated requirements of a single admitted request is calculated as + * estimated execution time (cycles) / relative deadline (cycles) + * + * These estimates are incremented on request acceptance and decremented on request completion (either + * success or failure) + */ _Atomic uint64_t admissions_control_admitted; uint64_t admissions_control_capacity; const double admissions_control_overhead = 0.2; + +void +admissions_control_initialize() +{ +#ifdef ADMISSIONS_CONTROL + atomic_init(&admissions_control_admitted, 0); + admissions_control_capacity = runtime_worker_threads_count * ADMISSIONS_CONTROL_GRANULARITY + * ((double)1.0 - admissions_control_overhead); +#endif +} + +void +admissions_control_add(uint64_t admissions_estimate) +{ +#ifdef ADMISSIONS_CONTROL + assert(admissions_estimate > 0); + atomic_fetch_add(&admissions_control_admitted, admissions_estimate); + +#ifdef LOG_ADMISSIONS_CONTROL + debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity); +#endif + +#endif /* ADMISSIONS_CONTROL */ +} + +void +admissions_control_subtract(uint64_t admissions_estimate) +{ +#ifdef ADMISSIONS_CONTROL + /* Assumption: Should never underflow */ + if (unlikely(admissions_estimate > admissions_control_admitted)) panic("Admissions Estimate underflow\n"); + + atomic_fetch_sub(&admissions_control_admitted, admissions_estimate); + +#ifdef LOG_ADMISSIONS_CONTROL + debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity); +#endif + +#endif /* ADMISSIONS_CONTROL */ +} + +uint64_t +admissions_control_calculate_estimate(uint64_t estimated_execution, uint64_t relative_deadline) +{ +#ifdef ADMISSIONS_CONTROL + assert(relative_deadline != 0); + uint64_t admissions_estimate = (estimated_execution * (uint64_t)ADMISSIONS_CONTROL_GRANULARITY) + / relative_deadline; + if (admissions_estimate == 0) + panic("Ratio of Deadline to Execution time cannot exceed %d\n", ADMISSIONS_CONTROL_GRANULARITY); + + return admissions_estimate; +#else + return 0; +#endif +} + +uint64_t +admissions_control_calculate_estimate_us(uint32_t estimated_execution_us, uint32_t relative_deadline_us) +{ +#ifdef ADMISSIONS_CONTROL + assert(relative_deadline_us != 0); + return (uint64_t)((uint64_t)(estimated_execution_us * ADMISSIONS_CONTROL_GRANULARITY)) / relative_deadline_us; +#else + return 0; +#endif +} + +void +admissions_control_log_decision(uint64_t admissions_estimate, bool admitted) +{ +#ifdef LOG_ADMISSIONS_CONTROL + debuglog("Admitted: %lu, Capacity: %lu, Estimate: %lu, Admitted? %s\n", admissions_control_admitted, + admissions_control_capacity, admissions_estimate, admitted ? "yes" : "no"); +#endif /* LOG_ADMISSIONS_CONTROL */ +} + +uint64_t +admissions_control_decide(uint64_t admissions_estimate) +{ + uint64_t work_admitted = 1; /* Nominal non-zero value in case admissions control is disabled */ + +#ifdef ADMISSIONS_CONTROL + if (unlikely(admissions_estimate == 0)) panic("Admissions estimate should never be zero"); + + uint64_t total_admitted = atomic_load(&admissions_control_admitted); + + if (total_admitted + admissions_estimate >= admissions_control_capacity) { + admissions_control_log_decision(admissions_estimate, false); + work_admitted = 0; + } else { + admissions_control_log_decision(admissions_estimate, true); + admissions_control_add(admissions_estimate); + work_admitted = admissions_estimate; + } +#endif /* ADMISSIONS_CONTROL */ + + return work_admitted; +} diff --git a/runtime/src/admissions_info.c b/runtime/src/admissions_info.c new file mode 100644 index 0000000..a705fcc --- /dev/null +++ b/runtime/src/admissions_info.c @@ -0,0 +1,52 @@ +#include "admissions_info.h" +#include "debuglog.h" + +/** + * Initializes perf window + * @param self + */ +void +admissions_info_initialize(struct admissions_info *self, int percentile, uint64_t expected_execution, + uint64_t relative_deadline) +{ +#ifdef ADMISSIONS_CONTROL + assert(relative_deadline > 0); + assert(expected_execution > 0); + self->relative_deadline = relative_deadline; + self->estimate = admissions_control_calculate_estimate(expected_execution, relative_deadline); + debuglog("Initial Estimate: %lu\n", self->estimate); + assert(self != NULL); + + perf_window_initialize(&self->perf_window); + + if (unlikely(percentile < 50 || percentile > 99)) panic("Invalid admissions percentile"); + self->percentile = percentile; + + self->control_index = PERF_WINDOW_BUFFER_SIZE * percentile / 100; +#ifdef LOG_ADMISSIONS_CONTROL + debuglog("Percentile: %d\n", self->percentile); + debuglog("Control Index: %d\n", self->control_index); +#endif +#endif +} + + +/* + * Adds an execution value to the perf window and calculates and caches and updated estimate + * @param self + * @param execution_duration + */ +void +admissions_info_update(struct admissions_info *self, uint64_t execution_duration) +{ +#ifdef ADMISSIONS_CONTROL + assert(!software_interrupt_is_enabled()); + struct perf_window *perf_window = &self->perf_window; + + LOCK_LOCK(&self->perf_window.lock); + perf_window_add(perf_window, execution_duration); + uint64_t estimated_execution = perf_window_get_percentile(perf_window, self->percentile, self->control_index); + self->estimate = admissions_control_calculate_estimate(estimated_execution, self->relative_deadline); + LOCK_UNLOCK(&self->perf_window.lock); +#endif +} diff --git a/runtime/src/client_socket.c b/runtime/src/client_socket.c new file mode 100644 index 0000000..f545533 --- /dev/null +++ b/runtime/src/client_socket.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include + +#include "client_socket.h" +#include "debuglog.h" +#include "http_response.h" +#include "http_total.h" +#include "likely.h" +#include "panic.h" +#include "worker_thread.h" + +void +client_socket_close(int client_socket, struct sockaddr *client_address) +{ + /* Should never close 0, 1, or 2 */ + assert(client_socket != STDIN_FILENO); + assert(client_socket != STDOUT_FILENO); + assert(client_socket != STDERR_FILENO); + + if (unlikely(close(client_socket) < 0)) { + char client_address_text[INET6_ADDRSTRLEN] = {}; + if (unlikely(inet_ntop(AF_INET, &client_address, client_address_text, INET6_ADDRSTRLEN) == NULL)) { + debuglog("Failed to log client_address: %s", strerror(errno)); + } + debuglog("Error closing client socket %d associated with %s - %s", client_socket, client_address_text, + strerror(errno)); + } +} + +/** + * Rejects request due to admission control or error + * @param client_socket - the client we are rejecting + * @param status_code - either 503 or 400 + */ +int +client_socket_send(int client_socket, int status_code) +{ + const char *response; + int rc; + switch (status_code) { + case 503: + response = HTTP_RESPONSE_503_SERVICE_UNAVAILABLE; + http_total_increment_5XX(); + break; + case 400: + response = HTTP_RESPONSE_400_BAD_REQUEST; + http_total_increment_4XX(); + break; + default: + panic("%d is not a valid status code\n", status_code); + } + + int sent = 0; + int to_send = strlen(response); + + while (sent < to_send) { + rc = write(client_socket, &response[sent], to_send - sent); + if (rc < 0) { + if (errno == EAGAIN) { debuglog("Unexpectedly blocking on write of %s\n", response); } + + debuglog("Error with %s\n", strerror(errno)); + + goto send_err; + } + sent += rc; + }; + + rc = 0; +done: + return rc; +send_err: + debuglog("Error sending to client: %s", strerror(errno)); + rc = -1; + goto done; +} diff --git a/runtime/src/current_sandbox.c b/runtime/src/current_sandbox.c index b5aa137..296e563 100644 --- a/runtime/src/current_sandbox.c +++ b/runtime/src/current_sandbox.c @@ -43,51 +43,3 @@ current_sandbox_set(struct sandbox *sandbox) worker_thread_current_sandbox = sandbox; } } - -/** - * Initializes and returns an IO handle on the current sandbox ready for use - * @return index of handle we preopened or -1 if all io_handles are exhausted - */ -int -current_sandbox_initialize_io_handle(void) -{ - return sandbox_initialize_io_handle(current_sandbox_get()); -} - -size_t sandbox_parse_http_request(struct sandbox *sandbox, size_t l); - -/** - * Sets the file descriptor of the sandbox's ith io_handle - * Returns error condition if the file_descriptor to set does not contain sandbox preopen magin - * @param io_handle_index index of the sandbox io_handle we want to set - * @param file_descriptor the file descripter we want to set it to - * @returns the index that was set or -1 in case of error - */ -int -current_sandbox_set_file_descriptor(int io_handle_index, int file_descriptor) -{ - return sandbox_set_file_descriptor(current_sandbox_get(), io_handle_index, file_descriptor); -} - -/** - * Get the file descriptor of the sandbox's ith io_handle - * @param io_handle_index index into the sandbox's io_handles table - * @returns file descriptor - */ -int -current_sandbox_get_file_descriptor(int io_handle_index) -{ - struct sandbox *sandbox = current_sandbox_get(); - return sandbox_get_file_descriptor(sandbox, io_handle_index); -} - -/** - * Close the sandbox's ith io_handle - * @param io_handle_index index of the handle to close - */ -void -current_sandbox_close_file_descriptor(int io_handle_index) -{ - struct sandbox *sandbox = current_sandbox_get(); - sandbox_close_file_descriptor(sandbox, io_handle_index); -} diff --git a/runtime/src/generic_thread.c b/runtime/src/generic_thread.c index 1fb9f9c..3deea4c 100644 --- a/runtime/src/generic_thread.c +++ b/runtime/src/generic_thread.c @@ -1,6 +1,7 @@ #include #include "arch/getcycles.h" +#include "debuglog.h" /* Implemented by listener and workers */ @@ -13,3 +14,18 @@ generic_thread_initialize() generic_thread_start_timestamp = __getcycles(); generic_thread_lock_duration = 0; } + +/** + * Reports lock contention + */ +void +generic_thread_dump_lock_overhead() +{ +#ifndef NDEBUG +#ifdef LOG_LOCK_OVERHEAD + uint64_t duration = __getcycles() - generic_thread_start_timestamp; + debuglog("Locks consumed %lu / %lu cycles, or %f%%\n", generic_thread_lock_duration, duration, + (double)generic_thread_lock_duration / duration * 100); +#endif +#endif +} diff --git a/runtime/src/global_request_scheduler_minheap.c b/runtime/src/global_request_scheduler_minheap.c index c6ddd2d..d5759cc 100644 --- a/runtime/src/global_request_scheduler_minheap.c +++ b/runtime/src/global_request_scheduler_minheap.c @@ -1,6 +1,7 @@ #include #include "global_request_scheduler.h" +#include "listener_thread.h" #include "panic.h" #include "priority_queue.h" #include "runtime.h" @@ -17,7 +18,7 @@ global_request_scheduler_minheap_add(void *sandbox_request) { assert(sandbox_request); assert(global_request_scheduler_minheap); - if (unlikely(runtime_is_worker())) panic("%s is only callable by the listener thread\n", __func__); + if (unlikely(!listener_thread_is_running())) panic("%s is only callable by the listener thread\n", __func__); int return_code = priority_queue_enqueue(global_request_scheduler_minheap, sandbox_request); /* TODO: Propagate -1 to caller. Issue #91 */ diff --git a/runtime/src/http_request.c b/runtime/src/http_request.c index a53e403..2489a5b 100644 --- a/runtime/src/http_request.c +++ b/runtime/src/http_request.c @@ -1,3 +1,5 @@ +#include + #include "http_request.h" /*************************************************** @@ -16,3 +18,18 @@ http_request_get_body(struct http_request *http_request, char **body) *body = http_request->body; return http_request->body_length; } + +void +http_request_print(struct http_request *self) +{ + printf("Header Count %d\n", self->header_count); + printf("Header Content:\n"); + for (int i = 0; i < self->header_count; i++) { + for (int j = 0; j < self->headers[i].key_length; j++) { putchar(self->headers[i].key[j]); } + putchar(':'); + for (int j = 0; j < self->headers[i].value_length; j++) { putchar(self->headers[i].value[j]); } + putchar('\n'); + } + printf("Body Length %d\n", self->body_length); + printf("Body Read Length %d\n", self->body_read_length); +} diff --git a/runtime/src/http_total.c b/runtime/src/http_total.c index 6a3d3c8..c4e4ee0 100644 --- a/runtime/src/http_total.c +++ b/runtime/src/http_total.c @@ -13,6 +13,46 @@ _Atomic uint32_t http_total_2XX = 0; _Atomic uint32_t http_total_4XX = 0; #endif +void +http_total_init() +{ + atomic_init(&http_total_requests, 0); + atomic_init(&http_total_5XX, 0); +#ifdef LOG_TOTAL_REQS_RESPS + atomic_init(&http_total_2XX, 0); + atomic_init(&http_total_4XX, 0); +#endif +} + +void +http_total_increment_request() +{ + atomic_fetch_add(&http_total_requests, 1); +} + +void +http_total_increment_2xx() +{ +#ifdef LOG_TOTAL_REQS_RESPS + atomic_fetch_add(&http_total_2XX, 1); +#endif +} + +void +http_total_increment_4XX() +{ +#ifdef LOG_TOTAL_REQS_RESPS + atomic_fetch_add(&http_total_4XX, 1); +#endif +} + +void +http_total_increment_5XX() +{ + atomic_fetch_add(&http_total_5XX, 1); +} + + void http_total_log() { diff --git a/runtime/src/libc/syscall.c b/runtime/src/libc/syscall.c index 7fca22c..5735acb 100644 --- a/runtime/src/libc/syscall.c +++ b/runtime/src/libc/syscall.c @@ -131,10 +131,11 @@ err: int32_t wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size) { + struct sandbox *s = current_sandbox_get(); + if (fd == 1 || fd == 2) { - char * buffer = worker_thread_get_memory_ptr_void(buf_offset, buf_size); - struct sandbox *s = current_sandbox_get(); - int l = s->module->max_response_size - s->request_response_data_length; + char *buffer = worker_thread_get_memory_ptr_void(buf_offset, buf_size); + int l = s->module->max_response_size - s->request_response_data_length; if (l > buf_size) l = buf_size; if (l == 0) return 0; memcpy(s->request_response_data + s->request_response_data_length, buffer, l); @@ -143,7 +144,7 @@ wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size) return l; } - int f = current_sandbox_get_file_descriptor(fd); + int f = sandbox_get_file_descriptor(s, fd); char *buf = worker_thread_get_memory_ptr_void(buf_offset, buf_size); int32_t res = 0; @@ -192,7 +193,7 @@ wasm_open(int32_t path_off, int32_t flags, int32_t mode) { char *path = worker_thread_get_memory_string(path_off, MODULE_MAX_PATH_LENGTH); - int iofd = current_sandbox_initialize_io_handle(); + int iofd = sandbox_initialize_io_handle(current_sandbox_get()); if (iofd < 0) return -1; int32_t modified_flags = 0; @@ -237,7 +238,8 @@ wasm_open(int32_t path_off, int32_t flags, int32_t mode) int32_t wasm_close(int32_t io_handle_index) { - int fd = current_sandbox_get_file_descriptor(io_handle_index); + struct sandbox *sandbox = current_sandbox_get(); + int fd = sandbox_get_file_descriptor(sandbox, io_handle_index); // Silently disregard client requests to close STDIN, STDOUT, or STDERR if (fd <= STDERR_FILENO) return 0; diff --git a/runtime/src/listener_thread.c b/runtime/src/listener_thread.c index b9bc198..fff839c 100644 --- a/runtime/src/listener_thread.c +++ b/runtime/src/listener_thread.c @@ -1,6 +1,8 @@ #include +#include #include "arch/getcycles.h" +#include "client_socket.h" #include "global_request_scheduler.h" #include "generic_thread.h" #include "listener_thread.h" @@ -15,6 +17,8 @@ int listener_thread_epoll_file_descriptor; /* Timestamp when listener thread began executing */ static __thread uint64_t listener_thread_start_timestamp; +pthread_t listener_thread_id; + /** * Initializes the listener thread, pinned to core 0, and starts to listen for requests */ @@ -31,15 +35,14 @@ listener_thread_initialize(void) listener_thread_epoll_file_descriptor = epoll_create1(0); assert(listener_thread_epoll_file_descriptor >= 0); - pthread_t listener_thread; - int ret = pthread_create(&listener_thread, NULL, listener_thread_main, NULL); + int ret = pthread_create(&listener_thread_id, NULL, listener_thread_main, NULL); assert(ret == 0); - ret = pthread_setaffinity_np(listener_thread, sizeof(cpu_set_t), &cs); + ret = pthread_setaffinity_np(listener_thread_id, sizeof(cpu_set_t), &cs); assert(ret == 0); ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs); assert(ret == 0); - printf("\tListener core thread: %lx\n", listener_thread); + printf("\tListener core thread: %lx\n", listener_thread_id); } /** diff --git a/runtime/src/module.c b/runtime/src/module.c index f5917c7..65661a8 100644 --- a/runtime/src/module.c +++ b/runtime/src/module.c @@ -84,6 +84,31 @@ err: goto done; } + +/** + * Sets the HTTP Request and Response Headers and Content type on a module + * @param module + * @param request_count + * @param request_headers + * @param request_content_type + * @param response_count + * @param response_headers + * @param response_content_type + */ +static inline void +module_set_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[], + int response_count, char *response_headers, char response_content_type[]) +{ + assert(module); + module->request_header_count = request_count; + memcpy(module->request_headers, request_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT); + strcpy(module->request_content_type, request_content_type); + module->response_header_count = response_count; + memcpy(module->response_headers, response_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT); + strcpy(module->response_content_type, response_content_type); +} + + /*************************************** * Public Methods ***************************************/ diff --git a/runtime/src/module_database.c b/runtime/src/module_database.c index fc78d6f..d3daa00 100644 --- a/runtime/src/module_database.c +++ b/runtime/src/module_database.c @@ -1,4 +1,7 @@ +#include + #include "module_database.h" +#include "panic.h" /******************* * Module Database * @@ -7,6 +10,31 @@ struct module *module_database[MODULE_MAX_MODULE_COUNT] = { NULL }; size_t module_database_count = 0; +/** + * Adds a module to the in-memory module DB + * @param module module to add + * @return 0 on success. -ENOSPC when full + */ +int +module_database_add(struct module *module) +{ + assert(module_database_count <= MODULE_MAX_MODULE_COUNT); + + int rc; + + if (module_database_count == MODULE_MAX_MODULE_COUNT) goto err_no_space; + module_database[module_database_count++] = module; + + rc = 0; +done: + return rc; +err_no_space: + panic("Cannot add module. Database is full.\n"); + rc = -ENOSPC; + goto done; +} + + /** * Given a name, find the associated module * @param name diff --git a/runtime/src/priority_queue.c b/runtime/src/priority_queue.c index 6d057c9..52cfc7f 100644 --- a/runtime/src/priority_queue.c +++ b/runtime/src/priority_queue.c @@ -5,6 +5,7 @@ #include #include +#include "listener_thread.h" #include "panic.h" #include "priority_queue.h" @@ -55,7 +56,7 @@ priority_queue_is_empty(struct priority_queue *self) { assert(self != NULL); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); - assert(!runtime_is_worker() || !software_interrupt_is_enabled()); + assert(listener_thread_is_running() || !software_interrupt_is_enabled()); return self->size == 0; } @@ -133,7 +134,7 @@ priority_queue_percolate_down(struct priority_queue *self, int parent_index) assert(self != NULL); assert(self->get_priority_fn != NULL); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); - assert(runtime_is_worker()); + assert(!listener_thread_is_running()); assert(!software_interrupt_is_enabled()); bool update_highest_value = parent_index == 1; @@ -179,7 +180,6 @@ struct priority_queue * priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn) { assert(get_priority_fn != NULL); - assert(!runtime_is_worker() || !software_interrupt_is_enabled()); /* Add one to capacity because this data structure ignores the element at 0 */ size_t one_based_capacity = capacity + 1; @@ -207,7 +207,7 @@ void priority_queue_free(struct priority_queue *self) { assert(self != NULL); - assert(!runtime_is_worker() || !software_interrupt_is_enabled()); + assert(listener_thread_is_running() || !software_interrupt_is_enabled()); free(self); } @@ -220,7 +220,7 @@ int priority_queue_length_nolock(struct priority_queue *self) { assert(self != NULL); - assert(runtime_is_worker()); + assert(!listener_thread_is_running()); assert(!software_interrupt_is_enabled()); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); @@ -250,7 +250,7 @@ priority_queue_enqueue_nolock(struct priority_queue *self, void *value) { assert(self != NULL); assert(value != NULL); - assert(!runtime_is_worker() || !software_interrupt_is_enabled()); + assert(listener_thread_is_running() || !software_interrupt_is_enabled()); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); int rc; @@ -294,7 +294,7 @@ priority_queue_delete_nolock(struct priority_queue *self, void *value) { assert(self != NULL); assert(value != NULL); - assert(runtime_is_worker()); + assert(!listener_thread_is_running()); assert(!software_interrupt_is_enabled()); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); @@ -361,7 +361,7 @@ priority_queue_dequeue_if_earlier_nolock(struct priority_queue *self, void **deq assert(self != NULL); assert(dequeued_element != NULL); assert(self->get_priority_fn != NULL); - assert(runtime_is_worker()); + assert(!listener_thread_is_running()); assert(!software_interrupt_is_enabled()); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); @@ -414,7 +414,7 @@ priority_queue_top_nolock(struct priority_queue *self, void **dequeued_element) assert(self != NULL); assert(dequeued_element != NULL); assert(self->get_priority_fn != NULL); - assert(runtime_is_worker()); + assert(!listener_thread_is_running()); assert(!software_interrupt_is_enabled()); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index 236e785..b83ab3d 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -234,6 +235,26 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox) return 0; } +/** + * Initializes and returns an IO handle of a sandbox ready for use + * @param sandbox + * @param file_descriptor what we'll set on the IO handle after initialization + * @return index of handle we preopened or -1 if all io_handles are exhausted + */ +static inline int +sandbox_initialize_io_handle_and_set_file_descriptor(struct sandbox *sandbox, int file_descriptor) +{ + if (!sandbox) return -1; + if (file_descriptor < 0) return file_descriptor; + int io_handle_index = sandbox_initialize_io_handle(sandbox); + if (io_handle_index != -1) { + sandbox->io_handles[io_handle_index].file_descriptor = + file_descriptor; /* per sandbox, so synchronization necessary! */ + } + return io_handle_index; +} + + static inline void sandbox_open_http(struct sandbox *sandbox) { @@ -268,19 +289,85 @@ sandbox_initialize_io_handles_and_file_descriptors(struct sandbox *sandbox) assert(f == 2); } +/** + * Prints key performance metrics for a sandbox to runtime_sandbox_perf_log + * This is defined by an environment variable + * @param sandbox + */ +void +sandbox_print_perf(struct sandbox *sandbox) +{ + /* If the log was not defined by an environment variable, early out */ + if (runtime_sandbox_perf_log == NULL) return; + + uint32_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz; + uint32_t queued_us = (sandbox->allocation_timestamp - sandbox->request_arrival_timestamp) + / runtime_processor_speed_MHz; + uint32_t initializing_us = sandbox->initializing_duration / runtime_processor_speed_MHz; + uint32_t runnable_us = sandbox->runnable_duration / runtime_processor_speed_MHz; + uint32_t running_us = sandbox->running_duration / runtime_processor_speed_MHz; + uint32_t blocked_us = sandbox->blocked_duration / runtime_processor_speed_MHz; + uint32_t returned_us = sandbox->returned_duration / runtime_processor_speed_MHz; + + /* + * Assumption: A sandbox is never able to free pages. If linear memory management + * becomes more intelligent, then peak linear memory size needs to be tracked + * seperately from current linear memory size. + */ + fprintf(runtime_sandbox_perf_log, "%lu,%s():%d,%s,%u,%u,%u,%u,%u,%u,%u,%u,%u\n", sandbox->id, + sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state), + sandbox->module->relative_deadline_us, total_time_us, queued_us, initializing_us, runnable_us, + running_us, blocked_us, returned_us, sandbox->linear_memory_size); +} + +void +sandbox_summarize_page_allocations(struct sandbox *sandbox) +{ +#ifdef LOG_SANDBOX_MEMORY_PROFILE + // TODO: Handle interleavings + char sandbox_page_allocations_log_path[100] = {}; + sandbox_page_allocations_log_path[99] = '\0'; + snprintf(sandbox_page_allocations_log_path, 99, "%s_%d_page_allocations.csv", sandbox->module->name, + sandbox->module->port); + + debuglog("Logging to %s", sandbox_page_allocations_log_path); + + FILE *sandbox_page_allocations_log = fopen(sandbox_page_allocations_log_path, "a"); + + fprintf(sandbox_page_allocations_log, "%lu,%lu,%s,", sandbox->id, sandbox->running_duration, + sandbox_state_stringify(sandbox->state)); + for (size_t i = 0; i < sandbox->page_allocation_timestamps_size; i++) + fprintf(sandbox_page_allocations_log, "%u,", sandbox->page_allocation_timestamps[i]); + + fprintf(sandbox_page_allocations_log, "\n"); +#else + return; +#endif +} + +void +sandbox_close_http(struct sandbox *sandbox) +{ + assert(sandbox != NULL); + + int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, sandbox->client_socket_descriptor, NULL); + if (unlikely(rc < 0)) panic_err(); + + client_socket_close(sandbox->client_socket_descriptor, &sandbox->client_address); +} + /** * Sandbox execution logic * Handles setup, request parsing, WebAssembly initialization, function execution, response building and * sending, and cleanup */ void -current_sandbox_main(void) +sandbox_start(void) { struct sandbox *sandbox = current_sandbox_get(); assert(sandbox != NULL); assert(sandbox->state == SANDBOX_RUNNING); - int rc; char *error_message = ""; assert(!software_interrupt_is_enabled()); @@ -292,8 +379,7 @@ current_sandbox_main(void) sandbox_open_http(sandbox); /* Parse the request */ - rc = sandbox_receive_and_parse_client_request(sandbox); - if (rc < 0) { + if (sandbox_receive_and_parse_client_request(sandbox) < 0) { error_message = "Unable to receive and parse client request\n"; goto err; }; @@ -313,8 +399,7 @@ current_sandbox_main(void) sandbox->completion_timestamp = __getcycles(); /* Retrieve the result, construct the HTTP response, and send to client */ - rc = sandbox_build_and_send_client_response(sandbox); - if (rc < 0) { + if (sandbox_build_and_send_client_response(sandbox) < 0) { error_message = "Unable to build and send client response\n"; goto err; }; @@ -473,8 +558,7 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand /* Initialize the sandbox's context, stack, and instruction pointer */ /* stack_start points to the bottom of the usable stack, so add stack_size to get to top */ - arch_context_init(&sandbox->ctxt, (reg_t)current_sandbox_main, - (reg_t)sandbox->stack_start + sandbox->stack_size); + arch_context_init(&sandbox->ctxt, (reg_t)sandbox_start, (reg_t)sandbox->stack_start + sandbox->stack_size); /* Initialize file descriptors to -1 */ for (int i = 0; i < SANDBOX_MAX_IO_HANDLE_COUNT; i++) sandbox->io_handles[i].file_descriptor = -1; diff --git a/runtime/src/software_interrupt.c b/runtime/src/software_interrupt.c index 8c66dae..fe45d1e 100644 --- a/runtime/src/software_interrupt.c +++ b/runtime/src/software_interrupt.c @@ -11,6 +11,7 @@ #include "current_sandbox.h" #include "debuglog.h" #include "global_request_scheduler.h" +#include "listener_thread.h" #include "local_runqueue.h" #include "module.h" #include "panic.h" @@ -169,7 +170,7 @@ static inline void software_interrupt_validate_worker() { #ifndef NDEBUG - if (!runtime_is_worker()) panic("A non-worker thread has unexpectedly received a signal!"); + if (listener_thread_is_running()) panic("The listener thread unexpectedly received a signal!"); #endif } diff --git a/runtime/src/worker_thread.c b/runtime/src/worker_thread.c index 43e7b67..e528a8b 100644 --- a/runtime/src/worker_thread.c +++ b/runtime/src/worker_thread.c @@ -341,9 +341,8 @@ worker_thread_main(void *argument) * releases the linear memory, and then returns to the base context */ __attribute__((noreturn)) void -worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox) +worker_thread_on_sandbox_exit() { - assert(exiting_sandbox); assert(!software_interrupt_is_enabled()); generic_thread_dump_lock_overhead(); worker_thread_switch_to_base_context();