From 7a62b154fc0464c17db589946ca471bf4773f080 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Mon, 25 Apr 2022 10:31:36 -0400 Subject: [PATCH] refactor: refactor TCP into module --- runtime/include/http_session.h | 13 ++- runtime/include/module.h | 29 ++--- runtime/include/sandbox_perf_log.h | 18 ++-- runtime/include/tcp_server.h | 102 ++++++++++++++++++ .../{client_socket.h => tcp_session.h} | 11 +- runtime/src/listener_thread.c | 12 ++- runtime/src/module.c | 50 ++------- runtime/src/module_database.c | 4 +- 8 files changed, 146 insertions(+), 93 deletions(-) create mode 100644 runtime/include/tcp_server.h rename runtime/include/{client_socket.h => tcp_session.h} (83%) diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index 475f31d..698b2f6 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -9,7 +9,7 @@ #include #include -#include "client_socket.h" +#include "tcp_session.h" #include "debuglog.h" #include "http_request.h" #include "http_parser.h" @@ -106,15 +106,14 @@ http_session_free(struct http_session *session) static inline int http_session_send_err(struct http_session *session, int status_code, void_cb on_eagain) { - return client_socket_send(session->socket, http_header_build(status_code), http_header_len(status_code), - on_eagain); + return tcp_session_send(session->socket, http_header_build(status_code), http_header_len(status_code), + on_eagain); } static inline int http_session_send_err_oneshot(struct http_session *session, int status_code) { - return client_socket_send_oneshot(session->socket, http_header_build(status_code), - http_header_len(status_code)); + return tcp_session_send_oneshot(session->socket, http_header_build(status_code), http_header_len(status_code)); } static inline int @@ -132,7 +131,7 @@ http_session_send_response(struct http_session *session, const char *response_co rc = http_header_200_write(session->socket, content_type, response->length); if (rc < 0) goto err; - rc = client_socket_send(session->socket, (const char *)response->buffer, response->length, on_eagain); + rc = tcp_session_send(session->socket, (const char *)response->buffer, response->length, on_eagain); if (rc < 0) goto err; http_total_increment_2xx(); @@ -149,7 +148,7 @@ err: static inline void http_session_close(struct http_session *session) { - return client_socket_close(session->socket, &session->client_address); + return tcp_session_close(session->socket, &session->client_address); } diff --git a/runtime/include/module.h b/runtime/include/module.h index 288090f..0f6c7c6 100644 --- a/runtime/include/module.h +++ b/runtime/include/module.h @@ -13,6 +13,7 @@ #include "panic.h" #include "pool.h" #include "sledge_abi_symbols.h" +#include "tcp_server.h" #include "types.h" #include "sledge_abi_symbols.h" #include "wasm_stack.h" @@ -30,21 +31,6 @@ extern thread_local int worker_thread_idx; INIT_POOL(wasm_memory, wasm_memory_free) INIT_POOL(wasm_stack, wasm_stack_free) -/* - * Defines the listen backlog, the queue length for completely established socketeds waiting to be accepted - * If this value is greater than the value in /proc/sys/net/core/somaxconn (typically 128), then it is silently - * truncated to this value. See man listen(2) for info - * - * When configuring the number of sockets to handle, the queue length of incomplete sockets defined in - * /proc/sys/net/ipv4/tcp_max_syn_backlog should also be considered. Optionally, enabling syncookies removes this - * maximum logical length. See tcp(7) for more info. - */ -#define MODULE_MAX_PENDING_CLIENT_REQUESTS 128 -#if MODULE_MAX_PENDING_CLIENT_REQUESTS > 128 -#warning \ - "MODULE_MAX_PENDING_CLIENT_REQUESTS likely exceeds the value in /proc/sys/net/core/somaxconn and thus may be silently truncated"; -#endif - /* TODO: Dynamically size based on number of threads */ #define MAX_WORKER_THREADS 64 @@ -60,16 +46,17 @@ struct module { char route[MODULE_MAX_ROUTE_LENGTH]; uint32_t stack_size; /* a specification? */ uint32_t relative_deadline_us; - uint16_t port; struct admissions_info admissions_info; uint64_t relative_deadline; /* cycles */ + /* TCP State */ + struct tcp_server tcp_server; + /* HTTP State */ - size_t max_request_size; - size_t max_response_size; - char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH]; - struct sockaddr_in socket_address; - int socket_descriptor; + size_t max_request_size; + size_t max_response_size; + char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH]; + /* Handle and ABI Symbols for *.so file */ struct sledge_abi_symbols abi; diff --git a/runtime/include/sandbox_perf_log.h b/runtime/include/sandbox_perf_log.h index 8da8a38..7fbe176 100644 --- a/runtime/include/sandbox_perf_log.h +++ b/runtime/include/sandbox_perf_log.h @@ -37,15 +37,15 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox) * seperately from current linear memory size. */ fprintf(sandbox_perf_log, "%lu,%s,%d,%s,%lu,%lu,%lu,,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n", - sandbox->id, sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state), - sandbox->module->relative_deadline, sandbox->total_time, queued_duration, - sandbox->duration_of_state[SANDBOX_UNINITIALIZED], sandbox->duration_of_state[SANDBOX_ALLOCATED], - sandbox->duration_of_state[SANDBOX_INITIALIZED], sandbox->duration_of_state[SANDBOX_RUNNABLE], - sandbox->duration_of_state[SANDBOX_INTERRUPTED], sandbox->duration_of_state[SANDBOX_PREEMPTED], - sandbox->duration_of_state[SANDBOX_RUNNING_SYS], sandbox->duration_of_state[SANDBOX_RUNNING_USER], - sandbox->duration_of_state[SANDBOX_ASLEEP], sandbox->duration_of_state[SANDBOX_RETURNED], - sandbox->duration_of_state[SANDBOX_COMPLETE], sandbox->duration_of_state[SANDBOX_ERROR], - runtime_processor_speed_MHz); + sandbox->id, sandbox->module->name, sandbox->module->tcp_server.port, + sandbox_state_stringify(sandbox->state), sandbox->module->relative_deadline, sandbox->total_time, + queued_duration, sandbox->duration_of_state[SANDBOX_UNINITIALIZED], + sandbox->duration_of_state[SANDBOX_ALLOCATED], sandbox->duration_of_state[SANDBOX_INITIALIZED], + sandbox->duration_of_state[SANDBOX_RUNNABLE], sandbox->duration_of_state[SANDBOX_INTERRUPTED], + sandbox->duration_of_state[SANDBOX_PREEMPTED], sandbox->duration_of_state[SANDBOX_RUNNING_SYS], + sandbox->duration_of_state[SANDBOX_RUNNING_USER], sandbox->duration_of_state[SANDBOX_ASLEEP], + sandbox->duration_of_state[SANDBOX_RETURNED], sandbox->duration_of_state[SANDBOX_COMPLETE], + sandbox->duration_of_state[SANDBOX_ERROR], runtime_processor_speed_MHz); } static inline void diff --git a/runtime/include/tcp_server.h b/runtime/include/tcp_server.h new file mode 100644 index 0000000..41d5a9a --- /dev/null +++ b/runtime/include/tcp_server.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "debuglog.h" +#include "likely.h" + +/* + * Defines the listen backlog, the queue length for completely established socketeds waiting to be accepted + * If this value is greater than the value in /proc/sys/net/core/somaxconn (typically 128), then it is silently + * truncated to this value. See man listen(2) for info + * + * When configuring the number of sockets to handle, the queue length of incomplete sockets defined in + * /proc/sys/net/ipv4/tcp_max_syn_backlog should also be considered. Optionally, enabling syncookies removes this + * maximum logical length. See tcp(7) for more info. + */ +#define TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS 128 +#if TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS > 128 +#warning \ + "TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS likely exceeds the value in /proc/sys/net/core/somaxconn and thus may be silently truncated"; +#endif + +/* L4 TCP State */ +struct tcp_server { + uint16_t port; + struct sockaddr_in socket_address; + int socket_descriptor; +}; + +static inline void +tcp_server_init(struct tcp_server *server, uint16_t port) +{ + server->port = port; + server->socket_descriptor = -1; +} + +/** + * Start the module as a server listening at module->port + * @param module + * @returns 0 on success, -1 on error + */ +static inline int +tcp_server_listen(struct tcp_server *server) +{ + int rc; + int optval = 1; + + /* Allocate a new TCP/IP socket, setting it to be non-blocking */ + int socket_descriptor = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (unlikely(socket_descriptor < 0)) goto err_create_socket; + + /* Socket should never have returned on fd 0, 1, or 2 */ + assert(socket_descriptor != STDIN_FILENO); + assert(socket_descriptor != STDOUT_FILENO); + assert(socket_descriptor != STDERR_FILENO); + + /* Configure the socket to allow multiple sockets to bind to the same host and port */ + rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); + if (unlikely(rc < 0)) goto err_set_socket_option; + optval = 1; + rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); + if (unlikely(rc < 0)) goto err_set_socket_option; + + /* Bind name [all addresses]:[module->port] to socket */ + server->socket_descriptor = socket_descriptor; + server->socket_address.sin_family = AF_INET; + server->socket_address.sin_addr.s_addr = htonl(INADDR_ANY); + server->socket_address.sin_port = htons((unsigned short)server->port); + rc = bind(socket_descriptor, (struct sockaddr *)&server->socket_address, sizeof(server->socket_address)); + if (unlikely(rc < 0)) goto err_bind_socket; + + /* Listen to the interface */ + rc = listen(socket_descriptor, TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS); + if (unlikely(rc < 0)) goto err_listen; + + rc = 0; +done: + return rc; +err_listen: +err_bind_socket: + server->socket_descriptor = -1; +err_set_socket_option: + close(socket_descriptor); +err_create_socket: +err: + debuglog("Socket Error: %s", strerror(errno)); + rc = -1; + goto done; +} + +static inline int +tcp_server_close(struct tcp_server *server) +{ + return close(server->socket_descriptor); +} diff --git a/runtime/include/client_socket.h b/runtime/include/tcp_session.h similarity index 83% rename from runtime/include/client_socket.h rename to runtime/include/tcp_session.h index 3717c10..0bc384e 100644 --- a/runtime/include/client_socket.h +++ b/runtime/include/tcp_session.h @@ -9,14 +9,11 @@ #include #include "debuglog.h" -#include "http.h" -#include "http_total.h" #include "panic.h" #include "likely.h" - static inline void -client_socket_close(int client_socket, struct sockaddr *client_address) +tcp_session_close(int client_socket, struct sockaddr *client_address) { /* Should never close 0, 1, or 2 */ assert(client_socket != STDIN_FILENO); @@ -43,7 +40,7 @@ typedef void (*void_cb)(void); * @returns 0 on success, -1 on error. */ static inline int -client_socket_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain) +tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain) { int rc; @@ -81,7 +78,7 @@ done: * @returns 0 */ static inline int -client_socket_send_oneshot(int client_socket, const char *buffer, size_t buffer_len) +tcp_session_send_oneshot(int client_socket, const char *buffer, size_t buffer_len) { - return client_socket_send(client_socket, buffer, buffer_len, NULL); + return tcp_session_send(client_socket, buffer, buffer_len, NULL); } diff --git a/runtime/src/listener_thread.c b/runtime/src/listener_thread.c index c09cb65..453f29e 100644 --- a/runtime/src/listener_thread.c +++ b/runtime/src/listener_thread.c @@ -2,12 +2,13 @@ #include #include "arch/getcycles.h" -#include "client_socket.h" #include "global_request_scheduler.h" #include "generic_thread.h" #include "listener_thread.h" +#include "module.h" #include "runtime.h" #include "sandbox_functions.h" +#include "tcp_session.h" /* * Descriptor of the epoll instance used to monitor the socket descriptors of registered @@ -58,7 +59,8 @@ listener_thread_register_module(struct module *mod) struct epoll_event accept_evt; accept_evt.data.ptr = (void *)mod; accept_evt.events = EPOLLIN; - rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, mod->socket_descriptor, &accept_evt); + rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, mod->tcp_server.socket_descriptor, + &accept_evt); return rc; } @@ -137,7 +139,7 @@ listener_thread_main(void *dummy) * reason */ while (true) { - int client_socket = accept4(module->socket_descriptor, + int client_socket = accept4(module->tcp_server.socket_descriptor, (struct sockaddr *)&client_address, &address_length, SOCK_NONBLOCK); if (unlikely(client_socket < 0)) { @@ -200,8 +202,8 @@ listener_thread_main(void *dummy) */ uint64_t work_admitted = admissions_control_decide(module->admissions_info.estimate); if (work_admitted == 0) { - client_socket_send_oneshot(client_socket, http_header_build(429), - http_header_len(429)); + tcp_session_send_oneshot(client_socket, http_header_build(429), + http_header_len(429)); http_session_close(session); continue; diff --git a/runtime/src/module.c b/runtime/src/module.c index 21f4f28..69d593e 100644 --- a/runtime/src/module.c +++ b/runtime/src/module.c @@ -14,6 +14,7 @@ #include "panic.h" #include "runtime.h" #include "scheduler.h" +#include "tcp_server.h" #include "wasm_table.h" /************************* @@ -92,9 +93,9 @@ module_init(struct module *module, struct module_config *config) strncpy(module->route, config->route, MODULE_MAX_ROUTE_LENGTH); strncpy(module->response_content_type, config->http_resp_content_type, HTTP_MAX_HEADER_VALUE_LENGTH); - module->stack_size = ((uint32_t)(round_up_to_page(stack_size == 0 ? WASM_STACK_SIZE : stack_size))); - module->socket_descriptor = -1; - module->port = config->port; + module->stack_size = ((uint32_t)(round_up_to_page(stack_size == 0 ? WASM_STACK_SIZE : stack_size))); + + tcp_server_init(&module->tcp_server, config->port); /* Deadlines */ module->relative_deadline_us = config->relative_deadline_us; @@ -134,37 +135,8 @@ err: int module_listen(struct module *module) { - int rc; - - /* Allocate a new TCP/IP socket, setting it to be non-blocking */ - int socket_descriptor = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); - if (unlikely(socket_descriptor < 0)) goto err_create_socket; - - /* Socket should never have returned on fd 0, 1, or 2 */ - assert(socket_descriptor != STDIN_FILENO); - assert(socket_descriptor != STDOUT_FILENO); - assert(socket_descriptor != STDERR_FILENO); - - /* Configure the socket to allow multiple sockets to bind to the same host and port */ - int optval = 1; - rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); - if (unlikely(rc < 0)) goto err_set_socket_option; - optval = 1; - rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); - if (unlikely(rc < 0)) goto err_set_socket_option; - - /* Bind name [all addresses]:[module->port] to socket */ - module->socket_descriptor = socket_descriptor; - module->socket_address.sin_family = AF_INET; - module->socket_address.sin_addr.s_addr = htonl(INADDR_ANY); - module->socket_address.sin_port = htons((unsigned short)module->port); - rc = bind(socket_descriptor, (struct sockaddr *)&module->socket_address, sizeof(module->socket_address)); - if (unlikely(rc < 0)) goto err_bind_socket; - - /* Listen to the interface */ - rc = listen(socket_descriptor, MODULE_MAX_PENDING_CLIENT_REQUESTS); - if (unlikely(rc < 0)) goto err_listen; - + int rc = tcp_server_listen(&module->tcp_server); + if (rc < 0) goto err; /* Set the socket descriptor and register with our global epoll instance to monitor for incoming HTTP requests */ @@ -175,14 +147,8 @@ module_listen(struct module *module) done: return rc; err_add_to_epoll: -err_listen: -err_bind_socket: - module->socket_descriptor = -1; -err_set_socket_option: - close(socket_descriptor); -err_create_socket: + tcp_server_close(&module->tcp_server); err: - debuglog("Socket Error: %s", strerror(errno)); rc = -1; goto done; } @@ -204,7 +170,7 @@ module_free(struct module *module) /* Do not free if we still have oustanding references */ if (module->reference_count) return; - close(module->socket_descriptor); + tcp_server_close(&module->tcp_server); sledge_abi_symbols_deinit(&module->abi); free(module); } diff --git a/runtime/src/module_database.c b/runtime/src/module_database.c index 215a8f5..5d91316 100644 --- a/runtime/src/module_database.c +++ b/runtime/src/module_database.c @@ -60,7 +60,7 @@ module_database_find_by_socket_descriptor(int socket_descriptor) { for (size_t i = 0; i < module_database_count; i++) { assert(module_database[i]); - if (module_database[i]->socket_descriptor == socket_descriptor) return module_database[i]; + if (module_database[i]->tcp_server.socket_descriptor == socket_descriptor) return module_database[i]; } return NULL; } @@ -75,7 +75,7 @@ module_database_find_by_port(uint16_t port) { for (size_t i = 0; i < module_database_count; i++) { assert(module_database[i]); - if (module_database[i]->port == port) return module_database[i]; + if (module_database[i]->tcp_server.port == port) return module_database[i]; } return NULL; }