From bca75a9dd4ff063c0400499548daf422ceee5245 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Fri, 20 May 2022 16:26:37 -0400 Subject: [PATCH 1/8] refactor: no http_session in sandbox lifetime. --- runtime/include/http.h | 27 +- runtime/include/http_session.h | 204 +++++++----- runtime/include/http_total.h | 2 +- runtime/include/listener_thread.h | 2 + runtime/include/sandbox_set_as_error.h | 9 +- runtime/include/sandbox_set_as_returned.h | 6 + .../include/scheduler_execute_epoll_loop.h | 2 - runtime/include/tcp_session.h | 57 +--- runtime/src/current_sandbox.c | 21 +- runtime/src/listener_thread.c | 294 +++++++++++------- runtime/src/sandbox.c | 5 +- 11 files changed, 365 insertions(+), 264 deletions(-) diff --git a/runtime/include/http.h b/runtime/include/http.h index 9ed8f5e..df3b9f5 100644 --- a/runtime/include/http.h +++ b/runtime/include/http.h @@ -13,17 +13,12 @@ #define HTTP_MAX_QUERY_PARAM_COUNT 16 #define HTTP_MAX_QUERY_PARAM_LENGTH 32 -#define HTTP_RESPONSE_200_TEMPLATE \ - "HTTP/1.1 200 OK\r\n" \ - "Server: SLEdge\r\n" \ - "Connection: close\r\n" \ - "Content-Type: %s\r\n" \ - "Content-Length: %lu\r\n" \ +#define HTTP_RESPONSE_200_OK \ + "HTTP/1.1 200 OK\r\n" \ + "Server: SLEdge\r\n" \ + "Connection: close\r\n" \ "\r\n" -/* The sum of format specifier characters in the template above */ -#define HTTP_RESPONSE_200_TEMPLATE_FORMAT_SPECIFIER_LENGTH 5 - #define HTTP_RESPONSE_400_BAD_REQUEST \ "HTTP/1.1 400 Bad Request\r\n" \ "Server: SLEdge\r\n" \ @@ -60,18 +55,16 @@ "Connection: close\r\n" \ "\r\n" -static inline int -http_header_200_write(int fd, const char *content_type, size_t content_length) -{ - return dprintf(fd, HTTP_RESPONSE_200_TEMPLATE, content_type, content_length); -} - static inline const char * http_header_build(int status_code) { const char *response; int rc; switch (status_code) { + case 200: + response = HTTP_RESPONSE_200_OK; + http_total_increment_2XX(); + break; case 400: response = HTTP_RESPONSE_400_BAD_REQUEST; http_total_increment_4XX(); @@ -103,10 +96,12 @@ http_header_build(int status_code) return response; } -static inline int +static inline size_t http_header_len(int status_code) { switch (status_code) { + case 200: + return strlen(HTTP_RESPONSE_200_OK); case 400: return strlen(HTTP_RESPONSE_400_BAD_REQUEST); case 404: diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index 41becb5..f4147b6 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -22,19 +22,54 @@ #define u8 uint8_t VEC(u8) +enum http_session_state +{ + HTTP_SESSION_UNINITIALIZED = 0, + HTTP_SESSION_INITIALIZED, + HTTP_SESSION_RECEIVING_REQUEST, + HTTP_SESSION_RECEIVE_REQUEST_BLOCKED, + HTTP_SESSION_RECEIVED_REQUEST, + HTTP_SESSION_EXECUTING, + HTTP_SESSION_EXECUTION_COMPLETE, + HTTP_SESSION_SENDING_RESPONSE_HEADER, + HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED, + HTTP_SESSION_SENDING_RESPONSE, + HTTP_SESSION_SEND_RESPONSE_BLOCKED, + HTTP_SESSION_SENT_RESPONSE +}; + struct http_session { - struct sockaddr client_address; /* client requesting connection! */ - int socket; - struct http_parser http_parser; - struct http_request http_request; - struct vec_u8 request_buffer; - struct vec_u8 response_buffer; - struct tenant *tenant; /* Backlink required when read blocks on listener core */ - uint64_t request_arrival_timestamp; + enum http_session_state state; + struct sockaddr client_address; /* client requesting connection! */ + int socket; + struct http_parser http_parser; + struct http_request http_request; + struct vec_u8 request_buffer; + const char *response_header; + size_t response_header_length; + size_t response_header_written; + struct vec_u8 response_buffer; + size_t response_buffer_written; + struct tenant *tenant; /* Backlink required when read blocks on listener core */ + uint64_t request_arrival_timestamp; }; /** - * @param session sandbox that we want to init + * Initalize state associated with an http parser + * Because the http_parser structure uses pointers to the request buffer, if realloc moves the request + * buffer, this should be called to clear stale state to force parsing to restart + */ +static inline void +http_session_parser_init(struct http_session *session) +{ + memset(&session->http_request, 0, sizeof(struct http_request)); + http_parser_init(&session->http_parser, HTTP_REQUEST); + /* Set the session as the data the http-parser has access to */ + session->http_parser.data = &session->http_request; +} + +/** + * @param session session that we want to init * @returns 0 on success, -1 on error */ static inline int @@ -42,6 +77,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str struct tenant *tenant, uint64_t request_arrival_timestamp) { assert(session != NULL); + assert(session->state == HTTP_SESSION_UNINITIALIZED); assert(socket_descriptor >= 0); assert(socket_address != NULL); @@ -50,12 +86,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str session->request_arrival_timestamp = request_arrival_timestamp; memcpy(&session->client_address, socket_address, sizeof(struct sockaddr)); - http_parser_init(&session->http_parser, HTTP_REQUEST); - - /* Set the http_request member as the data pointer the http_parser callbacks receive */ - session->http_parser.data = &session->http_request; - - memset(&session->http_request, 0, sizeof(struct http_request)); + http_session_parser_init(session); int rc = vec_u8_init(&session->request_buffer, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE); if (rc < 0) return -1; @@ -63,6 +94,8 @@ http_session_init(struct http_session *session, int socket_descriptor, const str /* Defer allocating response until we've matched a route */ session->response_buffer.buffer = NULL; + session->state = HTTP_SESSION_INITIALIZED; + return 0; } @@ -123,22 +156,20 @@ http_session_free(struct http_session *session) } /** - * Writes buffer to the client socket - * @param session - the HTTP session we want to send a 500 to - * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns 0 on success, -1 on error. + * Set Response Header + * @param session - the HTTP session we want to set the response header of + * @param status_code */ -static inline int -http_session_send_err(struct http_session *session, int status_code, void_cb on_eagain) +static inline void +http_session_set_response_header(struct http_session *session, int status_code) { assert(session != NULL); assert(status_code >= 100 && status_code <= 599); - return tcp_session_send(session->socket, http_header_build(status_code), http_header_len(status_code), - on_eagain); + session->response_header = http_header_build(status_code); + session->response_header_length = http_header_len(status_code); } - static inline void http_session_close(struct http_session *session) { @@ -148,52 +179,58 @@ http_session_close(struct http_session *session) } /** - * Writes an HTTP error code to the TCP socket associated with the session - * Closes without writing if TCP socket would have blocked - * Also cleans up the HTTP session + * Writes an HTTP header to the client + * @param client_socket - the client + * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out + * @returns 0 on success, -1 on error, -2 unused, -3 on eagain */ static inline int -http_session_send_err_oneshot(struct http_session *session, int status_code) +http_session_send_response_header(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); - assert(status_code >= 100 && status_code <= 599); - int rc = tcp_session_send_oneshot(session->socket, http_header_build(status_code), - http_header_len(status_code)); - http_session_close(session); - http_session_free(session); + while (session->response_header_length > session->response_header_written) { + ssize_t sent = + tcp_session_send(session->socket, + (const char *)&session->response_header[session->response_header_written], + session->response_header_length - session->response_header_written, on_eagain, + session); + if (sent < 0) { + return (int)sent; + } else { + session->response_header_written += (size_t)sent; + } + } - return rc; + return 0; } +/** + * Writes an HTTP body to the client + * @param client_socket - the client + * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out + * @returns 0 on success, -1 on error, -2 unused, -3 on eagain + */ static inline int -http_session_send_response(struct http_session *session, const char *response_content_type, void_cb on_eagain) +http_session_send_response_body(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); assert(session->response_buffer.buffer != NULL); - assert(response_content_type != NULL); - - int rc = 0; - - struct vec_u8 *response_buffer = &session->response_buffer; - - /* Send HTTP Response Header and Body */ - rc = http_header_200_write(session->socket, response_content_type, response_buffer->length); - if (rc < 0) goto err; - rc = tcp_session_send(session->socket, (const char *)response_buffer->buffer, response_buffer->length, - on_eagain); - if (rc < 0) goto err; - - http_total_increment_2xx(); - rc = 0; + while (session->response_buffer_written < session->response_buffer.length) { + ssize_t sent = + tcp_session_send(session->socket, + (const char *)&session->response_buffer.buffer[session->response_buffer_written], + session->response_buffer.length - session->response_buffer_written, on_eagain, + session); + if (sent < 0) { + return (int)sent; + } else { + session->response_buffer_written += (size_t)sent; + } + } -done: - return rc; -err: - debuglog("Error sending to client: %s", strerror(errno)); - rc = -1; - goto done; + return 0; } static inline bool @@ -202,20 +239,6 @@ http_session_request_buffer_is_full(struct http_session *session) return session->request_buffer.length == session->request_buffer.capacity; } -/** - * Initalize state associated with an http parser - * Because the http_parser structure uses pointers to the request buffer, if realloc moves the request - * buffer, this should be called to clear stale state to force parsing to restart - */ -static inline void -http_session_parser_init(struct http_session *session) -{ - memset(&session->http_request, 0, sizeof(struct http_request)); - http_parser_init(&session->http_parser, HTTP_REQUEST); - /* Set the session as the data the http-parser has access to */ - session->http_parser.data = &session->http_request; -} - static inline int http_session_request_buffer_grow(struct http_session *session) { @@ -249,10 +272,11 @@ http_session_request_buffer_resize(struct http_session *session, int required_si return 0; } -typedef void (*http_session_receive_request_egain_cb)(struct http_session *); +typedef void (*http_session_cb)(struct http_session *); +/* TODO: Why is this not in TCP logic? */ static inline ssize_t -http_session_receive_raw(struct http_session *session, http_session_receive_request_egain_cb on_eagain) +http_session_receive_raw(struct http_session *session, void_star_cb on_eagain) { assert(session->request_buffer.capacity > session->request_buffer.length); @@ -333,18 +357,19 @@ http_session_log_query_params(struct http_session *session) /** * Receive and Parse the Request for the current sandbox - * @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space + * @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space, -3 EAGAIN */ static inline int -http_session_receive_request(struct http_session *session, http_session_receive_request_egain_cb on_eagain) +http_session_receive_request(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); assert(session->request_buffer.capacity > 0); assert(session->request_buffer.length <= session->request_buffer.capacity); + assert(session->state == HTTP_SESSION_INITIALIZED || session->state == HTTP_SESSION_RECEIVE_REQUEST_BLOCKED); - int rc = 0; + session->state = HTTP_SESSION_RECEIVING_REQUEST; - http_session_parser_init(session); + int rc = 0; while (!session->http_request.message_end) { /* If we know the header size and content-length, resize exactly. Otherwise double */ @@ -361,6 +386,7 @@ http_session_receive_request(struct http_session *session, http_session_receive_ if (rc != 0) goto err_nobufs; } + /* TODO: Why is this a call to TCP receive? */ ssize_t bytes_received = http_session_receive_raw(session, on_eagain); if (bytes_received == -EAGAIN) goto err_eagain; if (bytes_received == -1) goto err; @@ -370,6 +396,7 @@ http_session_receive_request(struct http_session *session, http_session_receive_ } assert(session->http_request.message_end == true); + session->state = HTTP_SESSION_RECEIVED_REQUEST; http_session_log_query_params(session); @@ -416,3 +443,28 @@ http_session_write_response(struct http_session *session, const uint8_t *source, DONE: return rc; } + +static inline void +http_session_send_response(struct http_session *session, void_star_cb on_eagain) +{ + int rc = http_session_send_response_header(session, on_eagain); + if (unlikely(rc == -3)) { + /* session blocked and registered to epoll so continue to next handle */ + return; + } else if (unlikely(rc == -1)) { + http_session_close(session); + return; + } + + rc = http_session_send_response_body(session, on_eagain); + if (unlikely(rc == -3)) { + /* session blocked and registered to epoll so continue to next handle */ + return; + } else if (unlikely(rc == -1)) { + http_session_close(session); + return; + } + + http_session_close(session); + http_session_free(session); +} diff --git a/runtime/include/http_total.h b/runtime/include/http_total.h index 2f74cad..09fcf6f 100644 --- a/runtime/include/http_total.h +++ b/runtime/include/http_total.h @@ -35,7 +35,7 @@ http_total_increment_request() } static inline void -http_total_increment_2xx() +http_total_increment_2XX() { #ifdef LOG_TOTAL_REQS_RESPS atomic_fetch_add(&http_total_2XX, 1); diff --git a/runtime/include/listener_thread.h b/runtime/include/listener_thread.h index 8621eeb..8a91274 100644 --- a/runtime/include/listener_thread.h +++ b/runtime/include/listener_thread.h @@ -4,6 +4,7 @@ #include #include "generic_thread.h" +#include "http_session.h" #include "module.h" #define LISTENER_THREAD_CORE_ID 1 @@ -12,6 +13,7 @@ extern pthread_t listener_thread_id; void listener_thread_initialize(void); noreturn void *listener_thread_main(void *dummy); +void listener_thread_register_http_session(struct http_session *http); /** * Used to determine if running in the context of a listener thread diff --git a/runtime/include/sandbox_set_as_error.h b/runtime/include/sandbox_set_as_error.h index a76b1b9..4b949d8 100644 --- a/runtime/include/sandbox_set_as_error.h +++ b/runtime/include/sandbox_set_as_error.h @@ -4,6 +4,7 @@ #include #include "arch/getcycles.h" +#include "listener_thread.h" #include "local_completion_queue.h" #include "local_runqueue.h" #include "sandbox_state.h" @@ -37,8 +38,6 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state) case SANDBOX_RUNNING_SYS: { local_runqueue_delete(sandbox); sandbox_free_linear_memory(sandbox); - http_session_free(sandbox->http); - sandbox->http = NULL; break; } default: { @@ -59,6 +58,12 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state) /* Admissions Control Post Processing */ admissions_control_subtract(sandbox->admissions_estimate); + /* Return HTTP session to listener core to be written back to client */ + http_session_set_response_header(sandbox->http, 500); + sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session); + sandbox->http = NULL; + /* Terminal State Logging */ sandbox_perf_log_print_entry(sandbox); sandbox_summarize_page_allocations(sandbox); diff --git a/runtime/include/sandbox_set_as_returned.h b/runtime/include/sandbox_set_as_returned.h index 4032e39..f54e49a 100644 --- a/runtime/include/sandbox_set_as_returned.h +++ b/runtime/include/sandbox_set_as_returned.h @@ -4,6 +4,7 @@ #include #include "arch/getcycles.h" +#include "listener_thread.h" #include "local_runqueue.h" #include "panic.h" #include "sandbox_functions.h" @@ -50,6 +51,11 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state) sandbox_state_totals_increment(SANDBOX_RETURNED); sandbox_state_totals_decrement(last_state); + http_session_set_response_header(sandbox->http, 200); + sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session); + sandbox->http = NULL; + /* State Change Hooks */ sandbox_state_transition_from_hook(sandbox, last_state); sandbox_state_transition_to_hook(sandbox, SANDBOX_RETURNED); diff --git a/runtime/include/scheduler_execute_epoll_loop.h b/runtime/include/scheduler_execute_epoll_loop.h index 683b18c..2bb11bc 100644 --- a/runtime/include/scheduler_execute_epoll_loop.h +++ b/runtime/include/scheduler_execute_epoll_loop.h @@ -61,8 +61,6 @@ scheduler_execute_epoll_loop(void) case SANDBOX_ERROR: panic("Expected to have closed socket"); default: - http_session_send_err_oneshot(sandbox->http, 503); - http_session_close(sandbox->http); sandbox_set_as_error(sandbox, sandbox->state); } } else { diff --git a/runtime/include/tcp_session.h b/runtime/include/tcp_session.h index 6aa9bce..4e1ea8e 100644 --- a/runtime/include/tcp_session.h +++ b/runtime/include/tcp_session.h @@ -30,55 +30,30 @@ tcp_session_close(int client_socket, struct sockaddr *client_address) } } -typedef void (*void_cb)(void); +typedef void (*void_star_cb)(void *); /** * Writes buffer to the client socket * @param client_socket - the client * @param buffer - buffer to write to socket * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns 0 on success, -1 on error. + * @returns nwritten on success, -1 on error, -2 unused, -3 on eagain */ -static inline int -tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain) +static inline ssize_t +tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr) { - int rc; - - size_t cursor = 0; - - while (cursor < buffer_len) { - ssize_t sent = write(client_socket, &buffer[cursor], buffer_len - cursor); - if (sent < 0) { - if (errno == EAGAIN) { - if (on_eagain == NULL) { - rc = -1; - goto done; - } - on_eagain(); - } else { - debuglog("Error sending to client: %s", strerror(errno)); - rc = -1; - goto done; - } + assert(buffer != NULL); + assert(buffer_len > 0); + + ssize_t sent = write(client_socket, buffer, buffer_len); + if (sent < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (on_eagain != NULL) on_eagain(dataptr); + return -3; + } else { + return -1; } + } - assert(sent > 0); - cursor += (size_t)sent; - }; - - rc = 0; -done: - return rc; -} - -/** - * Rejects request due to admission control or error - * @param client_socket - the client we are rejecting - * @param buffer - buffer to write to socket - * @returns 0 - */ -static inline int -tcp_session_send_oneshot(int client_socket, const char *buffer, size_t buffer_len) -{ - return tcp_session_send(client_socket, buffer, buffer_len, NULL); + return sent; } diff --git a/runtime/src/current_sandbox.c b/runtime/src/current_sandbox.c index a0ca524..1250f8e 100644 --- a/runtime/src/current_sandbox.c +++ b/runtime/src/current_sandbox.c @@ -46,7 +46,7 @@ current_sandbox_sleep() /** * @brief Switches from an executing sandbox to the worker thread base context * - * This places the current sandbox on the completion queue if in RETURNED state + * This places the current sandbox on the completion queue if in RETURNED or RUNNING_SYS state */ void current_sandbox_exit() @@ -86,37 +86,29 @@ current_sandbox_wasm_trap_handler(int trapno) switch (trapno) { case WASM_TRAP_INVALID_INDEX: error_message = "WebAssembly Trap: Invalid Index\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_MISMATCHED_TYPE: error_message = "WebAssembly Trap: Mismatched Type\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_PROTECTED_CALL_STACK_OVERFLOW: error_message = "WebAssembly Trap: Protected Call Stack Overflow\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_OUT_OF_BOUNDS_LINEAR_MEMORY: error_message = "WebAssembly Trap: Out of Bounds Linear Memory Access\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_ILLEGAL_ARITHMETIC_OPERATION: error_message = "WebAssembly Trap: Illegal Arithmetic Operation\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_UNREACHABLE: error_message = "WebAssembly Trap: Unreachable Instruction\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; default: error_message = "WebAssembly Trap: Unknown Trapno\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; } debuglog("%s", error_message); worker_thread_epoll_remove_sandbox(sandbox); - http_session_close(sandbox->http); generic_thread_dump_lock_overhead(); current_sandbox_exit(); assert(0); @@ -165,7 +157,6 @@ current_sandbox_init() err: debuglog("%s", error_message); worker_thread_epoll_remove_sandbox(sandbox); - http_session_close(sandbox->http); generic_thread_dump_lock_overhead(); current_sandbox_exit(); return NULL; @@ -183,22 +174,12 @@ current_sandbox_fini() sandbox->timestamp_of.completion = __getcycles(); sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.request_arrival; - /* Retrieve the result, construct the HTTP response, and send to client */ - if (http_session_send_response(sandbox->http, sandbox->route->response_content_type, current_sandbox_sleep) - < 0) { - error_message = "Unable to build and send client response\n"; - goto err; - }; - - http_total_increment_2xx(); - sandbox->timestamp_of.response = __getcycles(); assert(sandbox->state == SANDBOX_RUNNING_SYS); worker_thread_epoll_remove_sandbox(sandbox); done: - http_session_close(sandbox->http); sandbox_set_as_returned(sandbox, SANDBOX_RUNNING_SYS); /* Cleanup connection and exit sandbox */ diff --git a/runtime/src/listener_thread.c b/runtime/src/listener_thread.c index aee05ce..14202cb 100644 --- a/runtime/src/listener_thread.c +++ b/runtime/src/listener_thread.c @@ -12,6 +12,18 @@ #include "tenant.h" #include "tenant_functions.h" +static void listener_thread_unregister_http_session(struct http_session *http); +static void panic_on_epoll_error(struct epoll_event *evt); + +static void on_client_socket_epoll_event(struct epoll_event *evt); +static void on_tenant_socket_epoll_event(struct epoll_event *evt); +static void on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant); +static void on_client_request_receiving(struct http_session *session); +static void on_client_request_received(struct http_session *session); +static void on_client_response_header_sending(struct http_session *session); +static void on_client_response_body_sending(struct http_session *session); +static void on_client_response_sent(struct http_session *session); + /* * Descriptor of the epoll instance used to monitor the socket descriptors of registered * serverless modules. The listener cores listens for incoming client requests through this. @@ -61,15 +73,32 @@ listener_thread_register_http_session(struct http_session *http) int rc = 0; struct epoll_event accept_evt; accept_evt.data.ptr = (void *)http; - accept_evt.events = EPOLLIN; - epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt); + switch (http->state) { + case HTTP_SESSION_RECEIVING_REQUEST: + accept_evt.events = EPOLLIN; + http->state = HTTP_SESSION_RECEIVE_REQUEST_BLOCKED; + break; + case HTTP_SESSION_SENDING_RESPONSE_HEADER: + accept_evt.events = EPOLLOUT; + http->state = HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED; + break; + case HTTP_SESSION_SENDING_RESPONSE: + accept_evt.events = EPOLLOUT; + http->state = HTTP_SESSION_SEND_RESPONSE_BLOCKED; + break; + default: + panic("Invalid HTTP Session State: %d\n", http->state); + } + + rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt); + if (rc != 0) { panic("Failed to add http session to listener thread epoll\n"); } } /** * @brief Registers a serverless tenant on the listener thread's epoll descriptor **/ -void +static void listener_thread_unregister_http_session(struct http_session *http) { assert(http != NULL); @@ -78,11 +107,13 @@ listener_thread_unregister_http_session(struct http_session *http) panic("Attempting to unregister an http session before listener thread initialization"); } - epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL); + int rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL); + if (rc != 0) { panic("Failed to remove http session from listener thread epoll\n"); } } /** * @brief Registers a serverless tenant on the listener thread's epoll descriptor + * Assumption: We never have to unregister a tenant **/ int listener_thread_register_tenant(struct tenant *tenant) @@ -117,121 +148,68 @@ panic_on_epoll_error(struct epoll_event *evt) } static void -handle_tcp_requests(struct epoll_event *evt) +on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant) { - assert((evt->events & EPOLLIN) == EPOLLIN); - - /* Unpack tenant from epoll event */ - struct tenant *tenant = (struct tenant *)evt->data.ptr; - assert(tenant); - - /* Accept Client Request as a nonblocking socket, saving address information */ - struct sockaddr_in client_address; - socklen_t address_length = sizeof(client_address); + uint64_t request_arrival_timestamp = __getcycles(); - /* Accept as many requests as possible, returning when we would have blocked */ - while (true) { - int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address, - &address_length, SOCK_NONBLOCK); - if (unlikely(client_socket < 0)) { - if (errno == EWOULDBLOCK || errno == EAGAIN) return; - - panic("accept4: %s", strerror(errno)); - } - - uint64_t request_arrival_timestamp = __getcycles(); - - http_total_increment_request(); - - /* Allocate HTTP Session */ - struct http_session *session = http_session_alloc(client_socket, - (const struct sockaddr *)&client_address, tenant, - request_arrival_timestamp); - - /* Receive HTTP Request */ - int rc = http_session_receive_request(session, listener_thread_register_http_session); - if (rc == -3) { - continue; - } else if (rc == -2) { - debuglog("Request size exceeded Buffer\n"); - http_session_send_err_oneshot(session, 413); - continue; - } else if (rc == -1) { - http_session_send_err_oneshot(session, 400); - continue; - } - - assert(session->http_request.message_end); - - /* Route to sandbox */ - struct route *route = http_router_match_route(&tenant->router, session->http_request.full_url); - if (route == NULL) { - http_session_send_err_oneshot(session, 404); - continue; - } + http_total_increment_request(); - /* - * Perform admissions control. - * If 0, workload was rejected, so close with 429 "Too Many Requests" and continue - * TODO: Consider providing a Retry-After header - */ - uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); - if (work_admitted == 0) { - http_session_send_err_oneshot(session, 429); - continue; - } - - /* Allocate a Sandbox */ - struct sandbox *sandbox = sandbox_alloc(route->module, session, route, tenant, work_admitted); - if (unlikely(sandbox == NULL)) { - http_session_send_err_oneshot(session, 503); - continue; - } - - /* If the global request scheduler is full, return a 429 to the client - */ - sandbox = global_request_scheduler_add(sandbox); - if (unlikely(sandbox == NULL)) { - http_session_send_err_oneshot(session, 429); - sandbox_free(sandbox); - continue; - } + /* Allocate HTTP Session */ + struct http_session *session = http_session_alloc(client_socket, (const struct sockaddr *)&client_address, + tenant, request_arrival_timestamp); + if (likely(session != NULL)) { + on_client_request_receiving(session); + return; + } else { + /* Failed to allocate memory */ + debuglog("Failed to allocate http session\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 500); + on_client_response_header_sending(session); + return; } } static void -resume_blocked_read(struct epoll_event *evt) +on_client_request_receiving(struct http_session *session) { - assert((evt->events & EPOLLIN) == EPOLLIN); - - /* Unpack http session from epoll event */ - struct http_session *session = (struct http_session *)evt->data.ptr; - assert(session); - /* Read HTTP request */ - int rc = http_session_receive_request(session, listener_thread_register_http_session); - if (rc == -3) { + int rc = http_session_receive_request(session, (void_star_cb)listener_thread_register_http_session); + if (likely(rc == 0)) { + on_client_request_received(session); + return; + } else if (rc == -3) { + /* session blocked and registered to epoll so continue to next handle */ return; } else if (rc == -2) { - debuglog("Request size exceeded Buffer\n"); - /* Request size exceeded Buffer, send 413 Payload Too Large */ - listener_thread_unregister_http_session(session); - http_session_send_err_oneshot(session, 413); + /* Failed to grow request buffer */ + debuglog("Failed to grow http request buffer\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 500); + on_client_response_header_sending(session); return; } else if (rc == -1) { - listener_thread_unregister_http_session(session); - http_session_send_err_oneshot(session, 400); + debuglog("Failed to receive or parse request\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 400); + on_client_response_header_sending(session); return; } - assert(session->http_request.message_end); + assert(0); +} - /* We read session to completion, so can remove from epoll */ - listener_thread_unregister_http_session(session); +static void +on_client_request_received(struct http_session *session) +{ + assert(session->state == HTTP_SESSION_RECEIVED_REQUEST); struct route *route = http_router_match_route(&session->tenant->router, session->http_request.full_url); if (route == NULL) { - http_session_send_err_oneshot(session, 404); + debuglog("Did not match any routes\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 404); + on_client_response_header_sending(session); return; } @@ -242,21 +220,129 @@ resume_blocked_read(struct epoll_event *evt) */ uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); if (work_admitted == 0) { - http_session_send_err_oneshot(session, 429); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 429); + on_client_response_header_sending(session); return; } /* Allocate a Sandbox */ + session->state = HTTP_SESSION_EXECUTING; struct sandbox *sandbox = sandbox_alloc(route->module, session, route, session->tenant, work_admitted); if (unlikely(sandbox == NULL)) { - http_session_send_err_oneshot(session, 503); + debuglog("Failed to allocate sandbox\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 500); + on_client_response_header_sending(session); return; } /* If the global request scheduler is full, return a 429 to the client */ if (unlikely(global_request_scheduler_add(sandbox) == NULL)) { - http_session_send_err_oneshot(session, 429); + debuglog("Failed to add sandbox to global queue\n"); sandbox_free(sandbox); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 429); + on_client_response_header_sending(session); + } +} + +static void +on_client_response_header_sending(struct http_session *session) +{ + assert(session->state = HTTP_SESSION_EXECUTION_COMPLETE); + session->state = HTTP_SESSION_SENDING_RESPONSE_HEADER; + + int rc = http_session_send_response_header(session, (void_star_cb)listener_thread_register_http_session); + if (likely(rc == 0)) { + on_client_response_body_sending(session); + return; + } else if (rc == -3) { + /* session blocked and registered to epoll so continue to next handle */ + return; + } else if (rc == -1) { + http_session_close(session); + return; + } +} + +static void +on_client_response_body_sending(struct http_session *session) +{ + /* Read HTTP request */ + int rc = http_session_send_response_body(session, (void_star_cb)listener_thread_register_http_session); + if (likely(rc == 0)) { + on_client_response_sent(session); + return; + } + if (rc == -3) { + /* session blocked and registered to epoll so continue to next handle */ + return; + } else if (rc == -1) { + http_session_close(session); + return; + } +} + +static void +on_client_response_sent(struct http_session *session) +{ + http_session_close(session); + http_session_free(session); + return; +} + +static void +on_tenant_socket_epoll_event(struct epoll_event *evt) +{ + assert((evt->events & EPOLLIN) == EPOLLIN); + + struct tenant *tenant = evt->data.ptr; + assert(tenant); + + /* Accept Client Request as a nonblocking socket, saving address information */ + struct sockaddr_in client_address; + socklen_t address_length = sizeof(client_address); + + /* Accept as many clients requests as possible, returning when we would have blocked */ + while (true) { + int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address, + &address_length, SOCK_NONBLOCK); + if (unlikely(client_socket < 0)) { + if (errno == EWOULDBLOCK || errno == EAGAIN) return; + + panic("accept4: %s", strerror(errno)); + } + + on_client_request_arrival(client_socket, (const struct sockaddr *)&client_address, tenant); + } +} + +static void +on_client_socket_epoll_event(struct epoll_event *evt) +{ + assert(evt); + + struct http_session *session = evt->data.ptr; + assert(session); + + listener_thread_unregister_http_session(session); + + switch (session->state) { + case HTTP_SESSION_RECEIVE_REQUEST_BLOCKED: + assert((evt->events & EPOLLIN) == EPOLLIN); + on_client_request_receiving(session); + break; + case HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED: + assert((evt->events & EPOLLOUT) == EPOLLOUT); + on_client_response_header_sending(session); + break; + case HTTP_SESSION_SEND_RESPONSE_BLOCKED: + assert((evt->events & EPOLLOUT) == EPOLLOUT); + on_client_response_body_sending(session); + break; + default: + panic("Invalid HTTP Session State"); } } @@ -298,9 +384,9 @@ listener_thread_main(void *dummy) panic_on_epoll_error(&epoll_events[i]); if (tenant_database_find_by_ptr(epoll_events[i].data.ptr) != NULL) { - handle_tcp_requests(&epoll_events[i]); + on_tenant_socket_epoll_event(&epoll_events[i]); } else { - resume_blocked_read(&epoll_events[i]); + on_client_socket_epoll_event(&epoll_events[i]); } } generic_thread_dump_lock_overhead(); diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index c9028aa..88c44a0 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -129,7 +129,6 @@ err_stack_allocation_failed: err_memory_allocation_failed: err_globals_allocation_failed: err_http_allocation_failed: - http_session_send_err_oneshot(sandbox->http, 503); sandbox_set_as_error(sandbox, SANDBOX_ALLOCATED); perror(error_message); rc = -1; @@ -205,13 +204,15 @@ sandbox_deinit(struct sandbox *sandbox) assert(sandbox != current_sandbox_get()); assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE); + /* Assumption: HTTP session was migrated to listener core */ + assert(sandbox->http == NULL); + module_release(sandbox->module); /* Linear Memory and Guard Page should already have been munmaped and set to NULL */ assert(sandbox->memory == NULL); if (likely(sandbox->stack != NULL)) sandbox_free_stack(sandbox); - if (likely(sandbox->http != NULL)) http_session_free(sandbox->http); if (likely(sandbox->globals.buffer != NULL)) sandbox_free_globals(sandbox); if (likely(sandbox->wasi_context != NULL)) wasi_context_destroy(sandbox->wasi_context); } From b32503b35047612694059d71ca0325f1d736cdbd Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Fri, 20 May 2022 16:53:45 -0400 Subject: [PATCH 2/8] refactor: tcp_session_recv --- runtime/include/http_session.h | 66 ++++++++++++++-------------------- runtime/include/tcp_session.h | 27 ++++++++++++++ 2 files changed, 53 insertions(+), 40 deletions(-) diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index f4147b6..ad2b6cf 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -274,44 +274,6 @@ http_session_request_buffer_resize(struct http_session *session, int required_si typedef void (*http_session_cb)(struct http_session *); -/* TODO: Why is this not in TCP logic? */ -static inline ssize_t -http_session_receive_raw(struct http_session *session, void_star_cb on_eagain) -{ - assert(session->request_buffer.capacity > session->request_buffer.length); - - ssize_t bytes_received = recv(session->socket, &session->request_buffer.buffer[session->request_buffer.length], - session->request_buffer.capacity - session->request_buffer.length, 0); - - if (bytes_received < 0) { - if (errno == EAGAIN) { - on_eagain(session); - return -EAGAIN; - } else { - debuglog("Error reading socket %d - %s\n", session->socket, strerror(errno)); - return -1; - } - } - - /* If we received an EOF before we were able to parse a complete HTTP message, request is malformed */ - if (bytes_received == 0 && !session->http_request.message_end) { - char client_address_text[INET6_ADDRSTRLEN] = {}; - if (unlikely(inet_ntop(AF_INET, &session->client_address, client_address_text, INET6_ADDRSTRLEN) - == NULL)) { - debuglog("Failed to log client_address: %s", strerror(errno)); - } - - debuglog("recv returned 0 before a complete request was received: socket: %d. Address: " - "%s\n", - session->socket, client_address_text); - http_request_print(&session->http_request); - return -1; - } - - session->request_buffer.length += bytes_received; - return bytes_received; -} - static inline ssize_t http_session_parse(struct http_session *session, ssize_t bytes_received) { @@ -355,6 +317,20 @@ http_session_log_query_params(struct http_session *session) #endif } +static inline void +http_session_log_malformed_request(struct http_session *session) +{ +#ifndef NDEBUG + char client_address_text[INET6_ADDRSTRLEN] = {}; + if (unlikely(inet_ntop(AF_INET, &session->client_address, client_address_text, INET6_ADDRSTRLEN) == NULL)) { + debuglog("Failed to log client_address: %s", strerror(errno)); + } + + debuglog("socket: %d. Address: %s\n", session->socket, client_address_text); + http_request_print(&session->http_request); +#endif +} + /** * Receive and Parse the Request for the current sandbox * @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space, -3 EAGAIN @@ -386,10 +362,17 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai if (rc != 0) goto err_nobufs; } - /* TODO: Why is this a call to TCP receive? */ - ssize_t bytes_received = http_session_receive_raw(session, on_eagain); + ssize_t bytes_received = + tcp_session_recv(session->socket, + (char *)&session->request_buffer.buffer[session->request_buffer.length], + session->request_buffer.capacity - session->request_buffer.length, on_eagain, + session); if (bytes_received == -EAGAIN) goto err_eagain; if (bytes_received == -1) goto err; + /* If we received an EOF before we were able to parse a complete HTTP message, request is malformed */ + if (bytes_received == 0 && !session->http_request.message_end) goto err; + + session->request_buffer.length += bytes_received; ssize_t bytes_parsed = http_session_parse(session, bytes_received); if (bytes_parsed == -1) goto err; @@ -404,12 +387,15 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai done: return rc; err_eagain: + http_session_log_malformed_request(session); rc = -3; goto done; err_nobufs: + http_session_log_malformed_request(session); rc = -2; goto done; err: + http_session_log_malformed_request(session); rc = -1; goto done; } diff --git a/runtime/include/tcp_session.h b/runtime/include/tcp_session.h index 4e1ea8e..ddb32b9 100644 --- a/runtime/include/tcp_session.h +++ b/runtime/include/tcp_session.h @@ -57,3 +57,30 @@ tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_ return sent; } + +/** + * Writes buffer to the client socket + * @param client_socket - the client + * @param buffer - buffer to reach the socket into + * @param buffer_len - buffer to reach the socket into + * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out + * @returns nwritten on success, -1 on error, -2 unused, -3 on eagain + */ +static inline ssize_t +tcp_session_recv(int client_socket, char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr) +{ + assert(buffer != NULL); + assert(buffer_len > 0); + + ssize_t received = read(client_socket, buffer, buffer_len); + if (received < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (on_eagain != NULL) on_eagain(dataptr); + return -3; + } else { + return -1; + } + } + + return received; +} From 9de83c5ac33c98b832547b18e48a11ab9f4c0ba2 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Fri, 20 May 2022 17:45:53 -0400 Subject: [PATCH 3/8] refactor: restore content-length and content-type in response --- runtime/include/http.h | 19 +++++++------- runtime/include/http_session.h | 32 +++++++++++++++++------ runtime/include/sandbox_set_as_error.h | 2 +- runtime/include/sandbox_set_as_returned.h | 3 ++- runtime/src/listener_thread.c | 14 +++++----- 5 files changed, 43 insertions(+), 27 deletions(-) diff --git a/runtime/include/http.h b/runtime/include/http.h index df3b9f5..cab6f34 100644 --- a/runtime/include/http.h +++ b/runtime/include/http.h @@ -13,12 +13,17 @@ #define HTTP_MAX_QUERY_PARAM_COUNT 16 #define HTTP_MAX_QUERY_PARAM_LENGTH 32 -#define HTTP_RESPONSE_200_OK \ - "HTTP/1.1 200 OK\r\n" \ - "Server: SLEdge\r\n" \ - "Connection: close\r\n" \ +#define HTTP_RESPONSE_200_TEMPLATE \ + "HTTP/1.1 200 OK\r\n" \ + "Server: SLEdge\r\n" \ + "Connection: close\r\n" \ + "Content-Type: %s\r\n" \ + "Content-Length: %lu\r\n" \ "\r\n" +/* The sum of format specifier characters in the template above */ +#define HTTP_RESPONSE_200_TEMPLATE_FORMAT_SPECIFIER_LENGTH 5 + #define HTTP_RESPONSE_400_BAD_REQUEST \ "HTTP/1.1 400 Bad Request\r\n" \ "Server: SLEdge\r\n" \ @@ -61,10 +66,6 @@ http_header_build(int status_code) const char *response; int rc; switch (status_code) { - case 200: - response = HTTP_RESPONSE_200_OK; - http_total_increment_2XX(); - break; case 400: response = HTTP_RESPONSE_400_BAD_REQUEST; http_total_increment_4XX(); @@ -100,8 +101,6 @@ static inline size_t http_header_len(int status_code) { switch (status_code) { - case 200: - return strlen(HTTP_RESPONSE_200_OK); case 400: return strlen(HTTP_RESPONSE_400_BAD_REQUEST); case 404: diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index ad2b6cf..5e3acf8 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -38,6 +38,8 @@ enum http_session_state HTTP_SESSION_SENT_RESPONSE }; +#define HTTP_SESSION_RESPONSE_HEADER_CAPACITY 256 + struct http_session { enum http_session_state state; struct sockaddr client_address; /* client requesting connection! */ @@ -45,7 +47,7 @@ struct http_session { struct http_parser http_parser; struct http_request http_request; struct vec_u8 request_buffer; - const char *response_header; + char response_header[HTTP_SESSION_RESPONSE_HEADER_CAPACITY]; size_t response_header_length; size_t response_header_written; struct vec_u8 response_buffer; @@ -161,13 +163,25 @@ http_session_free(struct http_session *session) * @param status_code */ static inline void -http_session_set_response_header(struct http_session *session, int status_code) +http_session_set_response_header(struct http_session *session, int status_code, const char *content_type, + size_t content_length) { assert(session != NULL); - assert(status_code >= 100 && status_code <= 599); - - session->response_header = http_header_build(status_code); - session->response_header_length = http_header_len(status_code); + assert(status_code >= 200 && status_code <= 599); + + if (status_code == 200) { + session->response_header_length = snprintf(session->response_header, + HTTP_SESSION_RESPONSE_HEADER_CAPACITY, + HTTP_RESPONSE_200_TEMPLATE, content_type, content_length); + } else { + size_t header_len = http_header_len(status_code); + size_t to_copy = HTTP_SESSION_RESPONSE_HEADER_CAPACITY < header_len + ? HTTP_SESSION_RESPONSE_HEADER_CAPACITY + : header_len; + + strncpy(session->response_header, http_header_build(status_code), to_copy - 1); + session->response_header_length = to_copy; + } } static inline void @@ -215,7 +229,6 @@ static inline int http_session_send_response_body(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); - assert(session->response_buffer.buffer != NULL); while (session->response_buffer_written < session->response_buffer.length) { ssize_t sent = @@ -367,11 +380,14 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai (char *)&session->request_buffer.buffer[session->request_buffer.length], session->request_buffer.capacity - session->request_buffer.length, on_eagain, session); - if (bytes_received == -EAGAIN) goto err_eagain; + if (bytes_received == -3) goto err_eagain; if (bytes_received == -1) goto err; /* If we received an EOF before we were able to parse a complete HTTP message, request is malformed */ if (bytes_received == 0 && !session->http_request.message_end) goto err; + assert(bytes_received > 0); + assert(session->request_buffer.length < session->request_buffer.capacity); + session->request_buffer.length += bytes_received; ssize_t bytes_parsed = http_session_parse(session, bytes_received); diff --git a/runtime/include/sandbox_set_as_error.h b/runtime/include/sandbox_set_as_error.h index 4b949d8..35b6be9 100644 --- a/runtime/include/sandbox_set_as_error.h +++ b/runtime/include/sandbox_set_as_error.h @@ -59,7 +59,7 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state) admissions_control_subtract(sandbox->admissions_estimate); /* Return HTTP session to listener core to be written back to client */ - http_session_set_response_header(sandbox->http, 500); + http_session_set_response_header(sandbox->http, 500, NULL, 0); sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE; http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session); sandbox->http = NULL; diff --git a/runtime/include/sandbox_set_as_returned.h b/runtime/include/sandbox_set_as_returned.h index f54e49a..3a20fc9 100644 --- a/runtime/include/sandbox_set_as_returned.h +++ b/runtime/include/sandbox_set_as_returned.h @@ -51,7 +51,8 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state) sandbox_state_totals_increment(SANDBOX_RETURNED); sandbox_state_totals_decrement(last_state); - http_session_set_response_header(sandbox->http, 200); + http_session_set_response_header(sandbox->http, 200, sandbox->route->response_content_type, + sandbox->http->response_buffer.length); sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE; http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session); sandbox->http = NULL; diff --git a/runtime/src/listener_thread.c b/runtime/src/listener_thread.c index 14202cb..5ab1c65 100644 --- a/runtime/src/listener_thread.c +++ b/runtime/src/listener_thread.c @@ -164,7 +164,7 @@ on_client_request_arrival(int client_socket, const struct sockaddr *client_addre /* Failed to allocate memory */ debuglog("Failed to allocate http session\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 500); + http_session_set_response_header(session, 500, NULL, 0); on_client_response_header_sending(session); return; } @@ -185,13 +185,13 @@ on_client_request_receiving(struct http_session *session) /* Failed to grow request buffer */ debuglog("Failed to grow http request buffer\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 500); + http_session_set_response_header(session, 500, NULL, 0); on_client_response_header_sending(session); return; } else if (rc == -1) { debuglog("Failed to receive or parse request\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 400); + http_session_set_response_header(session, 400, NULL, 0); on_client_response_header_sending(session); return; } @@ -208,7 +208,7 @@ on_client_request_received(struct http_session *session) if (route == NULL) { debuglog("Did not match any routes\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 404); + http_session_set_response_header(session, 404, NULL, 0); on_client_response_header_sending(session); return; } @@ -221,7 +221,7 @@ on_client_request_received(struct http_session *session) uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); if (work_admitted == 0) { session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 429); + http_session_set_response_header(session, 429, NULL, 0); on_client_response_header_sending(session); return; } @@ -232,7 +232,7 @@ on_client_request_received(struct http_session *session) if (unlikely(sandbox == NULL)) { debuglog("Failed to allocate sandbox\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 500); + http_session_set_response_header(session, 500, NULL, 0); on_client_response_header_sending(session); return; } @@ -242,7 +242,7 @@ on_client_request_received(struct http_session *session) debuglog("Failed to add sandbox to global queue\n"); sandbox_free(sandbox); session->state = HTTP_SESSION_EXECUTION_COMPLETE; - http_session_set_response_header(session, 429); + http_session_set_response_header(session, 429, NULL, 0); on_client_response_header_sending(session); } } From 50c7413f00b1da40726955aaeea0b9c86c6c6c0c Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Fri, 20 May 2022 18:06:48 -0400 Subject: [PATCH 4/8] fix: Remove merge conflict mistake --- runtime/include/sandbox_set_as_error.h | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/include/sandbox_set_as_error.h b/runtime/include/sandbox_set_as_error.h index 13b3ecf..4525264 100644 --- a/runtime/include/sandbox_set_as_error.h +++ b/runtime/include/sandbox_set_as_error.h @@ -5,7 +5,6 @@ #include "arch/getcycles.h" #include "listener_thread.h" -#include "local_completion_queue.h" #include "local_runqueue.h" #include "sandbox_state.h" #include "sandbox_functions.h" From 04b7f4d2df72c4a15eca81dd0328cb4c0bd8eaf7 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Mon, 23 May 2022 10:15:52 -0400 Subject: [PATCH 5/8] refactor: Remove zombie http-session variable --- runtime/src/current_sandbox.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/runtime/src/current_sandbox.c b/runtime/src/current_sandbox.c index 1250f8e..ef6469f 100644 --- a/runtime/src/current_sandbox.c +++ b/runtime/src/current_sandbox.c @@ -81,8 +81,6 @@ current_sandbox_wasm_trap_handler(int trapno) struct sandbox *sandbox = current_sandbox_get(); sandbox_syscall(sandbox); - struct http_session *session = sandbox->http; - switch (trapno) { case WASM_TRAP_INVALID_INDEX: error_message = "WebAssembly Trap: Invalid Index\n"; From 9e200ee93abcea2681c17e7b6235b5ed7041afa6 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Thu, 2 Jun 2022 12:55:33 -0400 Subject: [PATCH 6/8] fix: strip logs on http_session eagain --- runtime/include/http_session.h | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index d83a788..e6e4e7d 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -403,7 +403,6 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai done: return rc; err_eagain: - http_session_log_malformed_request(session); rc = -3; goto done; err_nobufs: From 1db1f9a3960cacd10c76da0613a882caaf18a02b Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Wed, 15 Jun 2022 15:17:07 -0400 Subject: [PATCH 7/8] refactor: Address Emil feedback --- runtime/include/http_session.h | 47 +++++++++++++--------- runtime/include/sandbox_perf_log.h | 2 +- runtime/include/sandbox_set_as_allocated.h | 1 + runtime/include/sandbox_set_as_returned.h | 1 - runtime/include/sandbox_set_as_runnable.h | 1 + runtime/include/sandbox_types.h | 3 +- runtime/include/tcp_session.h | 12 +++--- runtime/include/wasm_stack.h | 3 +- runtime/src/current_sandbox.c | 4 +- runtime/src/listener_thread.c | 19 +++++---- runtime/src/sandbox.c | 3 +- 11 files changed, 52 insertions(+), 44 deletions(-) diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index e6e4e7d..743b7de 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -54,6 +54,7 @@ struct http_session { size_t response_buffer_written; struct tenant *tenant; /* Backlink required when read blocks on listener core */ uint64_t request_arrival_timestamp; + uint64_t response_sent_timestamp; }; /** @@ -196,7 +197,7 @@ http_session_close(struct http_session *session) * Writes an HTTP header to the client * @param client_socket - the client * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns 0 on success, -1 on error, -2 unused, -3 on eagain + * @returns 0 on success, -errno on error */ static inline int http_session_send_response_header(struct http_session *session, void_star_cb on_eagain) @@ -223,7 +224,7 @@ http_session_send_response_header(struct http_session *session, void_star_cb on_ * Writes an HTTP body to the client * @param client_socket - the client * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns 0 on success, -1 on error, -2 unused, -3 on eagain + * @returns 0 on success, -errno on error */ static inline int http_session_send_response_body(struct http_session *session, void_star_cb on_eagain) @@ -346,7 +347,7 @@ http_session_log_malformed_request(struct http_session *session) /** * Receive and Parse the Request for the current sandbox - * @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space, -3 EAGAIN + * @return 0 if message parsing complete, -1 on error, -ENOMEM if buffers run out of space, -3 EAGAIN if would block */ static inline int http_session_receive_request(struct http_session *session, void_star_cb on_eagain) @@ -380,10 +381,13 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai (char *)&session->request_buffer.buffer[session->request_buffer.length], session->request_buffer.capacity - session->request_buffer.length, on_eagain, session); - if (bytes_received == -3) goto err_eagain; - if (bytes_received == -1) goto err; + if (unlikely(bytes_received == -EAGAIN)) + goto err_eagain; + else if (unlikely(bytes_received < 0)) + goto err; /* If we received an EOF before we were able to parse a complete HTTP message, request is malformed */ - if (bytes_received == 0 && !session->http_request.message_end) goto err; + else if (unlikely(bytes_received == 0 && !session->http_request.message_end)) + goto err; assert(bytes_received > 0); assert(session->request_buffer.length < session->request_buffer.capacity); @@ -403,11 +407,11 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai done: return rc; err_eagain: - rc = -3; + rc = -EAGAIN; goto done; err_nobufs: http_session_log_malformed_request(session); - rc = -2; + rc = -ENOMEM; goto done; err: http_session_log_malformed_request(session); @@ -449,23 +453,26 @@ static inline void http_session_send_response(struct http_session *session, void_star_cb on_eagain) { int rc = http_session_send_response_header(session, on_eagain); - if (unlikely(rc == -3)) { - /* session blocked and registered to epoll so continue to next handle */ - return; - } else if (unlikely(rc == -1)) { - http_session_close(session); - return; + /* session blocked and registered to epoll so continue to next handle */ + if (unlikely(rc == -EAGAIN)) { + goto DONE; + } else if (unlikely(rc < 0)) { + goto ERR; } rc = http_session_send_response_body(session, on_eagain); - if (unlikely(rc == -3)) { - /* session blocked and registered to epoll so continue to next handle */ - return; - } else if (unlikely(rc == -1)) { - http_session_close(session); - return; + /* session blocked and registered to epoll so continue to next handle */ + if (unlikely(rc == -EAGAIN)) { + goto DONE; + } else if (unlikely(rc < 0)) { + goto ERR; } + session->response_sent_timestamp = __getcycles(); + +DONE: + return; +ERR: http_session_close(session); http_session_free(session); } diff --git a/runtime/include/sandbox_perf_log.h b/runtime/include/sandbox_perf_log.h index e76261a..6becea3 100644 --- a/runtime/include/sandbox_perf_log.h +++ b/runtime/include/sandbox_perf_log.h @@ -29,7 +29,7 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox) /* If the log was not defined by an environment variable, early out */ if (sandbox_perf_log == NULL) return; - uint64_t queued_duration = sandbox->timestamp_of.allocation - sandbox->timestamp_of.request_arrival; + uint64_t queued_duration = sandbox->timestamp_of.dispatched - sandbox->timestamp_of.allocation; /* * Assumption: A sandbox is never able to free pages. If linear memory management diff --git a/runtime/include/sandbox_set_as_allocated.h b/runtime/include/sandbox_set_as_allocated.h index f77b215..89a5fe7 100644 --- a/runtime/include/sandbox_set_as_allocated.h +++ b/runtime/include/sandbox_set_as_allocated.h @@ -24,6 +24,7 @@ sandbox_set_as_allocated(struct sandbox *sandbox) /* State Change Bookkeeping */ assert(now > sandbox->timestamp_of.last_state_change); + sandbox->timestamp_of.allocation = now; sandbox->timestamp_of.last_state_change = now; sandbox_state_history_init(&sandbox->state_history); sandbox_state_history_append(&sandbox->state_history, SANDBOX_ALLOCATED); diff --git a/runtime/include/sandbox_set_as_returned.h b/runtime/include/sandbox_set_as_returned.h index 3a20fc9..88cc472 100644 --- a/runtime/include/sandbox_set_as_returned.h +++ b/runtime/include/sandbox_set_as_returned.h @@ -31,7 +31,6 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state) switch (last_state) { case SANDBOX_RUNNING_SYS: { - sandbox->timestamp_of.response = now; local_runqueue_delete(sandbox); sandbox_free_linear_memory(sandbox); break; diff --git a/runtime/include/sandbox_set_as_runnable.h b/runtime/include/sandbox_set_as_runnable.h index 8e28875..6b404d5 100644 --- a/runtime/include/sandbox_set_as_runnable.h +++ b/runtime/include/sandbox_set_as_runnable.h @@ -30,6 +30,7 @@ sandbox_set_as_runnable(struct sandbox *sandbox, sandbox_state_t last_state) switch (last_state) { case SANDBOX_INITIALIZED: { + sandbox->timestamp_of.dispatched = now; local_runqueue_add(sandbox); break; } diff --git a/runtime/include/sandbox_types.h b/runtime/include/sandbox_types.h index f9f2ca1..796c4b3 100644 --- a/runtime/include/sandbox_types.h +++ b/runtime/include/sandbox_types.h @@ -21,9 +21,8 @@ struct sandbox_timestamps { uint64_t last_state_change; /* Used for bookkeeping of actual execution time */ - uint64_t request_arrival; /* Timestamp when request is received */ uint64_t allocation; /* Timestamp when sandbox is allocated */ - uint64_t response; /* Timestamp when response is sent */ + uint64_t dispatched; /* Timestamp when a sandbox is first added to a worker's runqueue */ uint64_t completion; /* Timestamp when sandbox runs to completion */ #ifdef LOG_SANDBOX_MEMORY_PROFILE uint32_t page_allocations[SANDBOX_PAGE_ALLOCATION_TIMESTAMP_COUNT]; diff --git a/runtime/include/tcp_session.h b/runtime/include/tcp_session.h index ddb32b9..8baab85 100644 --- a/runtime/include/tcp_session.h +++ b/runtime/include/tcp_session.h @@ -37,7 +37,7 @@ typedef void (*void_star_cb)(void *); * @param client_socket - the client * @param buffer - buffer to write to socket * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns nwritten on success, -1 on error, -2 unused, -3 on eagain + * @returns nwritten on success, -errno, -EAGAIN on block */ static inline ssize_t tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr) @@ -49,9 +49,9 @@ tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_ if (sent < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if (on_eagain != NULL) on_eagain(dataptr); - return -3; + return -EAGAIN; } else { - return -1; + return -errno; } } @@ -64,7 +64,7 @@ tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_ * @param buffer - buffer to reach the socket into * @param buffer_len - buffer to reach the socket into * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns nwritten on success, -1 on error, -2 unused, -3 on eagain + * @returns nwritten on success, -errno on error, -eagain on block */ static inline ssize_t tcp_session_recv(int client_socket, char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr) @@ -76,9 +76,9 @@ tcp_session_recv(int client_socket, char *buffer, size_t buffer_len, void_star_c if (received < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { if (on_eagain != NULL) on_eagain(dataptr); - return -3; + return -EAGAIN; } else { - return -1; + return -errno; } } diff --git a/runtime/include/wasm_stack.h b/runtime/include/wasm_stack.h index 1821172..95d5ac3 100644 --- a/runtime/include/wasm_stack.h +++ b/runtime/include/wasm_stack.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "ps_list.h" #include "types.h" @@ -124,6 +125,6 @@ wasm_stack_reinit(struct wasm_stack *wasm_stack) assert(wasm_stack->low == wasm_stack->buffer + /* guard page */ PAGE_SIZE); assert(wasm_stack->high == wasm_stack->low + wasm_stack->capacity); - memset(wasm_stack->low, 0, wasm_stack->capacity); + explicit_bzero(wasm_stack->low, wasm_stack->capacity); ps_list_init_d(wasm_stack); } diff --git a/runtime/src/current_sandbox.c b/runtime/src/current_sandbox.c index ef6469f..1d3f43e 100644 --- a/runtime/src/current_sandbox.c +++ b/runtime/src/current_sandbox.c @@ -170,9 +170,7 @@ current_sandbox_fini() sandbox_syscall(sandbox); sandbox->timestamp_of.completion = __getcycles(); - sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.request_arrival; - - sandbox->timestamp_of.response = __getcycles(); + sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.allocation; assert(sandbox->state == SANDBOX_RUNNING_SYS); worker_thread_epoll_remove_sandbox(sandbox); diff --git a/runtime/src/listener_thread.c b/runtime/src/listener_thread.c index 5ab1c65..4553487 100644 --- a/runtime/src/listener_thread.c +++ b/runtime/src/listener_thread.c @@ -178,17 +178,17 @@ on_client_request_receiving(struct http_session *session) if (likely(rc == 0)) { on_client_request_received(session); return; - } else if (rc == -3) { + } else if (unlikely(rc == -EAGAIN)) { /* session blocked and registered to epoll so continue to next handle */ return; - } else if (rc == -2) { + } else if (unlikely(rc == -ENOMEM)) { /* Failed to grow request buffer */ debuglog("Failed to grow http request buffer\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; http_session_set_response_header(session, 500, NULL, 0); on_client_response_header_sending(session); return; - } else if (rc == -1) { + } else if (rc < 0) { debuglog("Failed to receive or parse request\n"); session->state = HTTP_SESSION_EXECUTION_COMPLETE; http_session_set_response_header(session, 400, NULL, 0); @@ -257,11 +257,12 @@ on_client_response_header_sending(struct http_session *session) if (likely(rc == 0)) { on_client_response_body_sending(session); return; - } else if (rc == -3) { + } else if (unlikely(rc == -EAGAIN)) { /* session blocked and registered to epoll so continue to next handle */ return; - } else if (rc == -1) { + } else if (rc < 0) { http_session_close(session); + http_session_free(session); return; } } @@ -274,12 +275,12 @@ on_client_response_body_sending(struct http_session *session) if (likely(rc == 0)) { on_client_response_sent(session); return; - } - if (rc == -3) { + } else if (unlikely(rc == -EAGAIN)) { /* session blocked and registered to epoll so continue to next handle */ return; - } else if (rc == -1) { + } else if (unlikely(rc < 0)) { http_session_close(session); + http_session_free(session); return; } } @@ -287,6 +288,8 @@ on_client_response_body_sending(struct http_session *session) static void on_client_response_sent(struct http_session *session) { + session->response_sent_timestamp = __getcycles(); + http_session_close(session); http_session_free(session); return; diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index 88c44a0..88cf3f8 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -153,8 +153,7 @@ sandbox_init(struct sandbox *sandbox, struct module *module, struct http_session sandbox->tenant = tenant; sandbox->route = route; - sandbox->timestamp_of.request_arrival = session->request_arrival_timestamp; - sandbox->absolute_deadline = session->request_arrival_timestamp + sandbox->route->relative_deadline; + sandbox->absolute_deadline = sandbox->timestamp_of.allocation + sandbox->route->relative_deadline; /* * Admissions Control State From 5fa65d2f0bb5072044fb059f8416aa1ba5592ac2 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Wed, 15 Jun 2022 15:51:32 -0400 Subject: [PATCH 8/8] fix: correct flush when zero blocking --- runtime/include/http_session.h | 10 +++++----- tests/traps/Makefile | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index 743b7de..6f7998a 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -457,7 +457,7 @@ http_session_send_response(struct http_session *session, void_star_cb on_eagain) if (unlikely(rc == -EAGAIN)) { goto DONE; } else if (unlikely(rc < 0)) { - goto ERR; + goto CLOSE; } rc = http_session_send_response_body(session, on_eagain); @@ -465,14 +465,14 @@ http_session_send_response(struct http_session *session, void_star_cb on_eagain) if (unlikely(rc == -EAGAIN)) { goto DONE; } else if (unlikely(rc < 0)) { - goto ERR; + goto CLOSE; } session->response_sent_timestamp = __getcycles(); -DONE: - return; -ERR: +CLOSE: http_session_close(session); http_session_free(session); +DONE: + return; } diff --git a/tests/traps/Makefile b/tests/traps/Makefile index e3285b0..c609122 100644 --- a/tests/traps/Makefile +++ b/tests/traps/Makefile @@ -31,6 +31,7 @@ debug: sledgert trap_divzero LD_LIBRARY_PATH=${SLEDGE_BINARY_DIR} gdb ${SLEDGE_BINARY_DIR}/sledgert \ --eval-command="handle SIGUSR1 noprint nostop" \ --eval-command="handle SIGPIPE noprint nostop" \ + --eval-command="handle SIGFPE noprint nostop" \ --eval-command="set pagination off" \ --eval-command="run spec.json" @@ -38,7 +39,7 @@ client-ok: echo "1" | http :10000/divide client-trap: - echo "0" | http :10000/divide + echo "0" | http --timeout 3600 :10000/divide client-trap2: echo "-1" | http :10000/divide