refactor: refactor TCP into module

master
Sean McBride 3 years ago
parent 35132ab2f2
commit 7a62b154fc

@ -9,7 +9,7 @@
#include <sys/socket.h>
#include <unistd.h>
#include "client_socket.h"
#include "tcp_session.h"
#include "debuglog.h"
#include "http_request.h"
#include "http_parser.h"
@ -106,15 +106,14 @@ http_session_free(struct http_session *session)
static inline int
http_session_send_err(struct http_session *session, int status_code, void_cb on_eagain)
{
return client_socket_send(session->socket, http_header_build(status_code), http_header_len(status_code),
on_eagain);
return tcp_session_send(session->socket, http_header_build(status_code), http_header_len(status_code),
on_eagain);
}
static inline int
http_session_send_err_oneshot(struct http_session *session, int status_code)
{
return client_socket_send_oneshot(session->socket, http_header_build(status_code),
http_header_len(status_code));
return tcp_session_send_oneshot(session->socket, http_header_build(status_code), http_header_len(status_code));
}
static inline int
@ -132,7 +131,7 @@ http_session_send_response(struct http_session *session, const char *response_co
rc = http_header_200_write(session->socket, content_type, response->length);
if (rc < 0) goto err;
rc = client_socket_send(session->socket, (const char *)response->buffer, response->length, on_eagain);
rc = tcp_session_send(session->socket, (const char *)response->buffer, response->length, on_eagain);
if (rc < 0) goto err;
http_total_increment_2xx();
@ -149,7 +148,7 @@ err:
static inline void
http_session_close(struct http_session *session)
{
return client_socket_close(session->socket, &session->client_address);
return tcp_session_close(session->socket, &session->client_address);
}

@ -13,6 +13,7 @@
#include "panic.h"
#include "pool.h"
#include "sledge_abi_symbols.h"
#include "tcp_server.h"
#include "types.h"
#include "sledge_abi_symbols.h"
#include "wasm_stack.h"
@ -30,21 +31,6 @@ extern thread_local int worker_thread_idx;
INIT_POOL(wasm_memory, wasm_memory_free)
INIT_POOL(wasm_stack, wasm_stack_free)
/*
* Defines the listen backlog, the queue length for completely established socketeds waiting to be accepted
* If this value is greater than the value in /proc/sys/net/core/somaxconn (typically 128), then it is silently
* truncated to this value. See man listen(2) for info
*
* When configuring the number of sockets to handle, the queue length of incomplete sockets defined in
* /proc/sys/net/ipv4/tcp_max_syn_backlog should also be considered. Optionally, enabling syncookies removes this
* maximum logical length. See tcp(7) for more info.
*/
#define MODULE_MAX_PENDING_CLIENT_REQUESTS 128
#if MODULE_MAX_PENDING_CLIENT_REQUESTS > 128
#warning \
"MODULE_MAX_PENDING_CLIENT_REQUESTS likely exceeds the value in /proc/sys/net/core/somaxconn and thus may be silently truncated";
#endif
/* TODO: Dynamically size based on number of threads */
#define MAX_WORKER_THREADS 64
@ -60,16 +46,17 @@ struct module {
char route[MODULE_MAX_ROUTE_LENGTH];
uint32_t stack_size; /* a specification? */
uint32_t relative_deadline_us;
uint16_t port;
struct admissions_info admissions_info;
uint64_t relative_deadline; /* cycles */
/* TCP State */
struct tcp_server tcp_server;
/* HTTP State */
size_t max_request_size;
size_t max_response_size;
char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH];
struct sockaddr_in socket_address;
int socket_descriptor;
size_t max_request_size;
size_t max_response_size;
char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH];
/* Handle and ABI Symbols for *.so file */
struct sledge_abi_symbols abi;

@ -37,15 +37,15 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox)
* seperately from current linear memory size.
*/
fprintf(sandbox_perf_log, "%lu,%s,%d,%s,%lu,%lu,%lu,,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n",
sandbox->id, sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state),
sandbox->module->relative_deadline, sandbox->total_time, queued_duration,
sandbox->duration_of_state[SANDBOX_UNINITIALIZED], sandbox->duration_of_state[SANDBOX_ALLOCATED],
sandbox->duration_of_state[SANDBOX_INITIALIZED], sandbox->duration_of_state[SANDBOX_RUNNABLE],
sandbox->duration_of_state[SANDBOX_INTERRUPTED], sandbox->duration_of_state[SANDBOX_PREEMPTED],
sandbox->duration_of_state[SANDBOX_RUNNING_SYS], sandbox->duration_of_state[SANDBOX_RUNNING_USER],
sandbox->duration_of_state[SANDBOX_ASLEEP], sandbox->duration_of_state[SANDBOX_RETURNED],
sandbox->duration_of_state[SANDBOX_COMPLETE], sandbox->duration_of_state[SANDBOX_ERROR],
runtime_processor_speed_MHz);
sandbox->id, sandbox->module->name, sandbox->module->tcp_server.port,
sandbox_state_stringify(sandbox->state), sandbox->module->relative_deadline, sandbox->total_time,
queued_duration, sandbox->duration_of_state[SANDBOX_UNINITIALIZED],
sandbox->duration_of_state[SANDBOX_ALLOCATED], sandbox->duration_of_state[SANDBOX_INITIALIZED],
sandbox->duration_of_state[SANDBOX_RUNNABLE], sandbox->duration_of_state[SANDBOX_INTERRUPTED],
sandbox->duration_of_state[SANDBOX_PREEMPTED], sandbox->duration_of_state[SANDBOX_RUNNING_SYS],
sandbox->duration_of_state[SANDBOX_RUNNING_USER], sandbox->duration_of_state[SANDBOX_ASLEEP],
sandbox->duration_of_state[SANDBOX_RETURNED], sandbox->duration_of_state[SANDBOX_COMPLETE],
sandbox->duration_of_state[SANDBOX_ERROR], runtime_processor_speed_MHz);
}
static inline void

@ -0,0 +1,102 @@
#pragma once
#include <assert.h>
#include <errno.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "debuglog.h"
#include "likely.h"
/*
* Defines the listen backlog, the queue length for completely established socketeds waiting to be accepted
* If this value is greater than the value in /proc/sys/net/core/somaxconn (typically 128), then it is silently
* truncated to this value. See man listen(2) for info
*
* When configuring the number of sockets to handle, the queue length of incomplete sockets defined in
* /proc/sys/net/ipv4/tcp_max_syn_backlog should also be considered. Optionally, enabling syncookies removes this
* maximum logical length. See tcp(7) for more info.
*/
#define TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS 128
#if TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS > 128
#warning \
"TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS likely exceeds the value in /proc/sys/net/core/somaxconn and thus may be silently truncated";
#endif
/* L4 TCP State */
struct tcp_server {
uint16_t port;
struct sockaddr_in socket_address;
int socket_descriptor;
};
static inline void
tcp_server_init(struct tcp_server *server, uint16_t port)
{
server->port = port;
server->socket_descriptor = -1;
}
/**
* Start the module as a server listening at module->port
* @param module
* @returns 0 on success, -1 on error
*/
static inline int
tcp_server_listen(struct tcp_server *server)
{
int rc;
int optval = 1;
/* Allocate a new TCP/IP socket, setting it to be non-blocking */
int socket_descriptor = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (unlikely(socket_descriptor < 0)) goto err_create_socket;
/* Socket should never have returned on fd 0, 1, or 2 */
assert(socket_descriptor != STDIN_FILENO);
assert(socket_descriptor != STDOUT_FILENO);
assert(socket_descriptor != STDERR_FILENO);
/* Configure the socket to allow multiple sockets to bind to the same host and port */
rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
if (unlikely(rc < 0)) goto err_set_socket_option;
optval = 1;
rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
if (unlikely(rc < 0)) goto err_set_socket_option;
/* Bind name [all addresses]:[module->port] to socket */
server->socket_descriptor = socket_descriptor;
server->socket_address.sin_family = AF_INET;
server->socket_address.sin_addr.s_addr = htonl(INADDR_ANY);
server->socket_address.sin_port = htons((unsigned short)server->port);
rc = bind(socket_descriptor, (struct sockaddr *)&server->socket_address, sizeof(server->socket_address));
if (unlikely(rc < 0)) goto err_bind_socket;
/* Listen to the interface */
rc = listen(socket_descriptor, TCP_SERVER_MAX_PENDING_CLIENT_REQUESTS);
if (unlikely(rc < 0)) goto err_listen;
rc = 0;
done:
return rc;
err_listen:
err_bind_socket:
server->socket_descriptor = -1;
err_set_socket_option:
close(socket_descriptor);
err_create_socket:
err:
debuglog("Socket Error: %s", strerror(errno));
rc = -1;
goto done;
}
static inline int
tcp_server_close(struct tcp_server *server)
{
return close(server->socket_descriptor);
}

@ -9,14 +9,11 @@
#include <unistd.h>
#include "debuglog.h"
#include "http.h"
#include "http_total.h"
#include "panic.h"
#include "likely.h"
static inline void
client_socket_close(int client_socket, struct sockaddr *client_address)
tcp_session_close(int client_socket, struct sockaddr *client_address)
{
/* Should never close 0, 1, or 2 */
assert(client_socket != STDIN_FILENO);
@ -43,7 +40,7 @@ typedef void (*void_cb)(void);
* @returns 0 on success, -1 on error.
*/
static inline int
client_socket_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain)
tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain)
{
int rc;
@ -81,7 +78,7 @@ done:
* @returns 0
*/
static inline int
client_socket_send_oneshot(int client_socket, const char *buffer, size_t buffer_len)
tcp_session_send_oneshot(int client_socket, const char *buffer, size_t buffer_len)
{
return client_socket_send(client_socket, buffer, buffer_len, NULL);
return tcp_session_send(client_socket, buffer, buffer_len, NULL);
}

@ -2,12 +2,13 @@
#include <unistd.h>
#include "arch/getcycles.h"
#include "client_socket.h"
#include "global_request_scheduler.h"
#include "generic_thread.h"
#include "listener_thread.h"
#include "module.h"
#include "runtime.h"
#include "sandbox_functions.h"
#include "tcp_session.h"
/*
* Descriptor of the epoll instance used to monitor the socket descriptors of registered
@ -58,7 +59,8 @@ listener_thread_register_module(struct module *mod)
struct epoll_event accept_evt;
accept_evt.data.ptr = (void *)mod;
accept_evt.events = EPOLLIN;
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, mod->socket_descriptor, &accept_evt);
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, mod->tcp_server.socket_descriptor,
&accept_evt);
return rc;
}
@ -137,7 +139,7 @@ listener_thread_main(void *dummy)
* reason
*/
while (true) {
int client_socket = accept4(module->socket_descriptor,
int client_socket = accept4(module->tcp_server.socket_descriptor,
(struct sockaddr *)&client_address, &address_length,
SOCK_NONBLOCK);
if (unlikely(client_socket < 0)) {
@ -200,8 +202,8 @@ listener_thread_main(void *dummy)
*/
uint64_t work_admitted = admissions_control_decide(module->admissions_info.estimate);
if (work_admitted == 0) {
client_socket_send_oneshot(client_socket, http_header_build(429),
http_header_len(429));
tcp_session_send_oneshot(client_socket, http_header_build(429),
http_header_len(429));
http_session_close(session);
continue;

@ -14,6 +14,7 @@
#include "panic.h"
#include "runtime.h"
#include "scheduler.h"
#include "tcp_server.h"
#include "wasm_table.h"
/*************************
@ -92,9 +93,9 @@ module_init(struct module *module, struct module_config *config)
strncpy(module->route, config->route, MODULE_MAX_ROUTE_LENGTH);
strncpy(module->response_content_type, config->http_resp_content_type, HTTP_MAX_HEADER_VALUE_LENGTH);
module->stack_size = ((uint32_t)(round_up_to_page(stack_size == 0 ? WASM_STACK_SIZE : stack_size)));
module->socket_descriptor = -1;
module->port = config->port;
module->stack_size = ((uint32_t)(round_up_to_page(stack_size == 0 ? WASM_STACK_SIZE : stack_size)));
tcp_server_init(&module->tcp_server, config->port);
/* Deadlines */
module->relative_deadline_us = config->relative_deadline_us;
@ -134,37 +135,8 @@ err:
int
module_listen(struct module *module)
{
int rc;
/* Allocate a new TCP/IP socket, setting it to be non-blocking */
int socket_descriptor = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (unlikely(socket_descriptor < 0)) goto err_create_socket;
/* Socket should never have returned on fd 0, 1, or 2 */
assert(socket_descriptor != STDIN_FILENO);
assert(socket_descriptor != STDOUT_FILENO);
assert(socket_descriptor != STDERR_FILENO);
/* Configure the socket to allow multiple sockets to bind to the same host and port */
int optval = 1;
rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
if (unlikely(rc < 0)) goto err_set_socket_option;
optval = 1;
rc = setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
if (unlikely(rc < 0)) goto err_set_socket_option;
/* Bind name [all addresses]:[module->port] to socket */
module->socket_descriptor = socket_descriptor;
module->socket_address.sin_family = AF_INET;
module->socket_address.sin_addr.s_addr = htonl(INADDR_ANY);
module->socket_address.sin_port = htons((unsigned short)module->port);
rc = bind(socket_descriptor, (struct sockaddr *)&module->socket_address, sizeof(module->socket_address));
if (unlikely(rc < 0)) goto err_bind_socket;
/* Listen to the interface */
rc = listen(socket_descriptor, MODULE_MAX_PENDING_CLIENT_REQUESTS);
if (unlikely(rc < 0)) goto err_listen;
int rc = tcp_server_listen(&module->tcp_server);
if (rc < 0) goto err;
/* Set the socket descriptor and register with our global epoll instance to monitor for incoming HTTP
requests */
@ -175,14 +147,8 @@ module_listen(struct module *module)
done:
return rc;
err_add_to_epoll:
err_listen:
err_bind_socket:
module->socket_descriptor = -1;
err_set_socket_option:
close(socket_descriptor);
err_create_socket:
tcp_server_close(&module->tcp_server);
err:
debuglog("Socket Error: %s", strerror(errno));
rc = -1;
goto done;
}
@ -204,7 +170,7 @@ module_free(struct module *module)
/* Do not free if we still have oustanding references */
if (module->reference_count) return;
close(module->socket_descriptor);
tcp_server_close(&module->tcp_server);
sledge_abi_symbols_deinit(&module->abi);
free(module);
}

@ -60,7 +60,7 @@ module_database_find_by_socket_descriptor(int socket_descriptor)
{
for (size_t i = 0; i < module_database_count; i++) {
assert(module_database[i]);
if (module_database[i]->socket_descriptor == socket_descriptor) return module_database[i];
if (module_database[i]->tcp_server.socket_descriptor == socket_descriptor) return module_database[i];
}
return NULL;
}
@ -75,7 +75,7 @@ module_database_find_by_port(uint16_t port)
{
for (size_t i = 0; i < module_database_count; i++) {
assert(module_database[i]);
if (module_database[i]->port == port) return module_database[i];
if (module_database[i]->tcp_server.port == port) return module_database[i];
}
return NULL;
}

Loading…
Cancel
Save