diff --git a/runtime/include/http.h b/runtime/include/http.h index 9ed8f5e..cab6f34 100644 --- a/runtime/include/http.h +++ b/runtime/include/http.h @@ -60,12 +60,6 @@ "Connection: close\r\n" \ "\r\n" -static inline int -http_header_200_write(int fd, const char *content_type, size_t content_length) -{ - return dprintf(fd, HTTP_RESPONSE_200_TEMPLATE, content_type, content_length); -} - static inline const char * http_header_build(int status_code) { @@ -103,7 +97,7 @@ http_header_build(int status_code) return response; } -static inline int +static inline size_t http_header_len(int status_code) { switch (status_code) { diff --git a/runtime/include/http_session.h b/runtime/include/http_session.h index 37266a7..6f7998a 100644 --- a/runtime/include/http_session.h +++ b/runtime/include/http_session.h @@ -22,19 +22,57 @@ #define u8 uint8_t VEC(u8) +enum http_session_state +{ + HTTP_SESSION_UNINITIALIZED = 0, + HTTP_SESSION_INITIALIZED, + HTTP_SESSION_RECEIVING_REQUEST, + HTTP_SESSION_RECEIVE_REQUEST_BLOCKED, + HTTP_SESSION_RECEIVED_REQUEST, + HTTP_SESSION_EXECUTING, + HTTP_SESSION_EXECUTION_COMPLETE, + HTTP_SESSION_SENDING_RESPONSE_HEADER, + HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED, + HTTP_SESSION_SENDING_RESPONSE, + HTTP_SESSION_SEND_RESPONSE_BLOCKED, + HTTP_SESSION_SENT_RESPONSE +}; + +#define HTTP_SESSION_RESPONSE_HEADER_CAPACITY 256 + struct http_session { - struct sockaddr client_address; /* client requesting connection! */ - int socket; - struct http_parser http_parser; - struct http_request http_request; - struct vec_u8 request_buffer; - struct vec_u8 response_buffer; - struct tenant *tenant; /* Backlink required when read blocks on listener core */ - uint64_t request_arrival_timestamp; + enum http_session_state state; + struct sockaddr client_address; /* client requesting connection! */ + int socket; + struct http_parser http_parser; + struct http_request http_request; + struct vec_u8 request_buffer; + char response_header[HTTP_SESSION_RESPONSE_HEADER_CAPACITY]; + size_t response_header_length; + size_t response_header_written; + struct vec_u8 response_buffer; + size_t response_buffer_written; + struct tenant *tenant; /* Backlink required when read blocks on listener core */ + uint64_t request_arrival_timestamp; + uint64_t response_sent_timestamp; }; /** - * @param session sandbox that we want to init + * Initalize state associated with an http parser + * Because the http_parser structure uses pointers to the request buffer, if realloc moves the request + * buffer, this should be called to clear stale state to force parsing to restart + */ +static inline void +http_session_parser_init(struct http_session *session) +{ + memset(&session->http_request, 0, sizeof(struct http_request)); + http_parser_init(&session->http_parser, HTTP_REQUEST); + /* Set the session as the data the http-parser has access to */ + session->http_parser.data = &session->http_request; +} + +/** + * @param session session that we want to init * @returns 0 on success, -1 on error */ static inline int @@ -42,6 +80,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str struct tenant *tenant, uint64_t request_arrival_timestamp) { assert(session != NULL); + assert(session->state == HTTP_SESSION_UNINITIALIZED); assert(socket_descriptor >= 0); assert(socket_address != NULL); @@ -50,12 +89,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str session->request_arrival_timestamp = request_arrival_timestamp; memcpy(&session->client_address, socket_address, sizeof(struct sockaddr)); - http_parser_init(&session->http_parser, HTTP_REQUEST); - - /* Set the http_request member as the data pointer the http_parser callbacks receive */ - session->http_parser.data = &session->http_request; - - memset(&session->http_request, 0, sizeof(struct http_request)); + http_session_parser_init(session); int rc = vec_u8_init(&session->request_buffer, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE); if (rc < 0) return -1; @@ -63,6 +97,8 @@ http_session_init(struct http_session *session, int socket_descriptor, const str /* Defer allocating response until we've matched a route */ session->response_buffer.buffer = NULL; + session->state = HTTP_SESSION_INITIALIZED; + return 0; } @@ -123,22 +159,32 @@ http_session_free(struct http_session *session) } /** - * Writes buffer to the client socket - * @param session - the HTTP session we want to send a 500 to - * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns 0 on success, -1 on error. + * Set Response Header + * @param session - the HTTP session we want to set the response header of + * @param status_code */ -static inline int -http_session_send_err(struct http_session *session, int status_code, void_cb on_eagain) +static inline void +http_session_set_response_header(struct http_session *session, int status_code, const char *content_type, + size_t content_length) { assert(session != NULL); - assert(status_code >= 100 && status_code <= 599); - - return tcp_session_send(session->socket, http_header_build(status_code), http_header_len(status_code), - on_eagain); + assert(status_code >= 200 && status_code <= 599); + + if (status_code == 200) { + session->response_header_length = snprintf(session->response_header, + HTTP_SESSION_RESPONSE_HEADER_CAPACITY, + HTTP_RESPONSE_200_TEMPLATE, content_type, content_length); + } else { + size_t header_len = http_header_len(status_code); + size_t to_copy = HTTP_SESSION_RESPONSE_HEADER_CAPACITY < header_len + ? HTTP_SESSION_RESPONSE_HEADER_CAPACITY + : header_len; + + strncpy(session->response_header, http_header_build(status_code), to_copy - 1); + session->response_header_length = to_copy; + } } - static inline void http_session_close(struct http_session *session) { @@ -148,52 +194,57 @@ http_session_close(struct http_session *session) } /** - * Writes an HTTP error code to the TCP socket associated with the session - * Closes without writing if TCP socket would have blocked - * Also cleans up the HTTP session + * Writes an HTTP header to the client + * @param client_socket - the client + * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out + * @returns 0 on success, -errno on error */ static inline int -http_session_send_err_oneshot(struct http_session *session, int status_code) +http_session_send_response_header(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); - assert(status_code >= 100 && status_code <= 599); - int rc = tcp_session_send_oneshot(session->socket, http_header_build(status_code), - http_header_len(status_code)); - http_session_close(session); - http_session_free(session); + while (session->response_header_length > session->response_header_written) { + ssize_t sent = + tcp_session_send(session->socket, + (const char *)&session->response_header[session->response_header_written], + session->response_header_length - session->response_header_written, on_eagain, + session); + if (sent < 0) { + return (int)sent; + } else { + session->response_header_written += (size_t)sent; + } + } - return rc; + return 0; } +/** + * Writes an HTTP body to the client + * @param client_socket - the client + * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out + * @returns 0 on success, -errno on error + */ static inline int -http_session_send_response(struct http_session *session, const char *response_content_type, void_cb on_eagain) +http_session_send_response_body(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); - assert(session->response_buffer.buffer != NULL); - assert(response_content_type != NULL); - - int rc = 0; - - struct vec_u8 *response_buffer = &session->response_buffer; - /* Send HTTP Response Header and Body */ - rc = http_header_200_write(session->socket, response_content_type, response_buffer->length); - if (rc < 0) goto err; - - rc = tcp_session_send(session->socket, (const char *)response_buffer->buffer, response_buffer->length, - on_eagain); - if (rc < 0) goto err; - - http_total_increment_2xx(); - rc = 0; + while (session->response_buffer_written < session->response_buffer.length) { + ssize_t sent = + tcp_session_send(session->socket, + (const char *)&session->response_buffer.buffer[session->response_buffer_written], + session->response_buffer.length - session->response_buffer_written, on_eagain, + session); + if (sent < 0) { + return (int)sent; + } else { + session->response_buffer_written += (size_t)sent; + } + } -done: - return rc; -err: - debuglog("Error sending to client: %s", strerror(errno)); - rc = -1; - goto done; + return 0; } static inline bool @@ -202,20 +253,6 @@ http_session_request_buffer_is_full(struct http_session *session) return session->request_buffer.length == session->request_buffer.capacity; } -/** - * Initalize state associated with an http parser - * Because the http_parser structure uses pointers to the request buffer, if realloc moves the request - * buffer, this should be called to clear stale state to force parsing to restart - */ -static inline void -http_session_parser_init(struct http_session *session) -{ - memset(&session->http_request, 0, sizeof(struct http_request)); - http_parser_init(&session->http_parser, HTTP_REQUEST); - /* Set the session as the data the http-parser has access to */ - session->http_parser.data = &session->http_request; -} - static inline int http_session_request_buffer_grow(struct http_session *session) { @@ -249,44 +286,7 @@ http_session_request_buffer_resize(struct http_session *session, int required_si return 0; } -typedef void (*http_session_receive_request_egain_cb)(struct http_session *); - -static inline ssize_t -http_session_receive_raw(struct http_session *session, http_session_receive_request_egain_cb on_eagain) -{ - assert(session->request_buffer.capacity > session->request_buffer.length); - - ssize_t bytes_received = recv(session->socket, &session->request_buffer.buffer[session->request_buffer.length], - session->request_buffer.capacity - session->request_buffer.length, 0); - - if (bytes_received < 0) { - if (errno == EAGAIN) { - on_eagain(session); - return -EAGAIN; - } else { - debuglog("Error reading socket %d - %s\n", session->socket, strerror(errno)); - return -1; - } - } - - /* If we received an EOF before we were able to parse a complete HTTP message, request is malformed */ - if (bytes_received == 0 && !session->http_request.message_end) { - char client_address_text[INET6_ADDRSTRLEN] = {}; - if (unlikely(inet_ntop(AF_INET, &session->client_address, client_address_text, INET6_ADDRSTRLEN) - == NULL)) { - debuglog("Failed to log client_address: %s", strerror(errno)); - } - - debuglog("recv returned 0 before a complete request was received: socket: %d. Address: " - "%s\n", - session->socket, client_address_text); - http_request_print(&session->http_request); - return -1; - } - - session->request_buffer.length += bytes_received; - return bytes_received; -} +typedef void (*http_session_cb)(struct http_session *); static inline ssize_t http_session_parse(struct http_session *session, ssize_t bytes_received) @@ -331,20 +331,35 @@ http_session_log_query_params(struct http_session *session) #endif } +static inline void +http_session_log_malformed_request(struct http_session *session) +{ +#ifndef NDEBUG + char client_address_text[INET6_ADDRSTRLEN] = {}; + if (unlikely(inet_ntop(AF_INET, &session->client_address, client_address_text, INET6_ADDRSTRLEN) == NULL)) { + debuglog("Failed to log client_address: %s", strerror(errno)); + } + + debuglog("socket: %d. Address: %s\n", session->socket, client_address_text); + http_request_print(&session->http_request); +#endif +} + /** * Receive and Parse the Request for the current sandbox - * @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space + * @return 0 if message parsing complete, -1 on error, -ENOMEM if buffers run out of space, -3 EAGAIN if would block */ static inline int -http_session_receive_request(struct http_session *session, http_session_receive_request_egain_cb on_eagain) +http_session_receive_request(struct http_session *session, void_star_cb on_eagain) { assert(session != NULL); assert(session->request_buffer.capacity > 0); assert(session->request_buffer.length <= session->request_buffer.capacity); + assert(session->state == HTTP_SESSION_INITIALIZED || session->state == HTTP_SESSION_RECEIVE_REQUEST_BLOCKED); - int rc = 0; + session->state = HTTP_SESSION_RECEIVING_REQUEST; - http_session_parser_init(session); + int rc = 0; while (!session->http_request.message_end) { /* If we know the header size and content-length, resize exactly. Otherwise double */ @@ -361,15 +376,30 @@ http_session_receive_request(struct http_session *session, http_session_receive_ if (rc != 0) goto err_nobufs; } - ssize_t bytes_received = http_session_receive_raw(session, on_eagain); - if (bytes_received == -EAGAIN) goto err_eagain; - if (bytes_received == -1) goto err; + ssize_t bytes_received = + tcp_session_recv(session->socket, + (char *)&session->request_buffer.buffer[session->request_buffer.length], + session->request_buffer.capacity - session->request_buffer.length, on_eagain, + session); + if (unlikely(bytes_received == -EAGAIN)) + goto err_eagain; + else if (unlikely(bytes_received < 0)) + goto err; + /* If we received an EOF before we were able to parse a complete HTTP message, request is malformed */ + else if (unlikely(bytes_received == 0 && !session->http_request.message_end)) + goto err; + + assert(bytes_received > 0); + assert(session->request_buffer.length < session->request_buffer.capacity); + + session->request_buffer.length += bytes_received; ssize_t bytes_parsed = http_session_parse(session, bytes_received); if (bytes_parsed == -1) goto err; } assert(session->http_request.message_end == true); + session->state = HTTP_SESSION_RECEIVED_REQUEST; http_session_log_query_params(session); @@ -377,12 +407,14 @@ http_session_receive_request(struct http_session *session, http_session_receive_ done: return rc; err_eagain: - rc = -3; + rc = -EAGAIN; goto done; err_nobufs: - rc = -2; + http_session_log_malformed_request(session); + rc = -ENOMEM; goto done; err: + http_session_log_malformed_request(session); rc = -1; goto done; } @@ -416,3 +448,31 @@ http_session_write_response(struct http_session *session, const uint8_t *source, DONE: return rc; } + +static inline void +http_session_send_response(struct http_session *session, void_star_cb on_eagain) +{ + int rc = http_session_send_response_header(session, on_eagain); + /* session blocked and registered to epoll so continue to next handle */ + if (unlikely(rc == -EAGAIN)) { + goto DONE; + } else if (unlikely(rc < 0)) { + goto CLOSE; + } + + rc = http_session_send_response_body(session, on_eagain); + /* session blocked and registered to epoll so continue to next handle */ + if (unlikely(rc == -EAGAIN)) { + goto DONE; + } else if (unlikely(rc < 0)) { + goto CLOSE; + } + + session->response_sent_timestamp = __getcycles(); + +CLOSE: + http_session_close(session); + http_session_free(session); +DONE: + return; +} diff --git a/runtime/include/http_total.h b/runtime/include/http_total.h index 2f74cad..09fcf6f 100644 --- a/runtime/include/http_total.h +++ b/runtime/include/http_total.h @@ -35,7 +35,7 @@ http_total_increment_request() } static inline void -http_total_increment_2xx() +http_total_increment_2XX() { #ifdef LOG_TOTAL_REQS_RESPS atomic_fetch_add(&http_total_2XX, 1); diff --git a/runtime/include/listener_thread.h b/runtime/include/listener_thread.h index 8621eeb..8a91274 100644 --- a/runtime/include/listener_thread.h +++ b/runtime/include/listener_thread.h @@ -4,6 +4,7 @@ #include #include "generic_thread.h" +#include "http_session.h" #include "module.h" #define LISTENER_THREAD_CORE_ID 1 @@ -12,6 +13,7 @@ extern pthread_t listener_thread_id; void listener_thread_initialize(void); noreturn void *listener_thread_main(void *dummy); +void listener_thread_register_http_session(struct http_session *http); /** * Used to determine if running in the context of a listener thread diff --git a/runtime/include/sandbox_perf_log.h b/runtime/include/sandbox_perf_log.h index e76261a..6becea3 100644 --- a/runtime/include/sandbox_perf_log.h +++ b/runtime/include/sandbox_perf_log.h @@ -29,7 +29,7 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox) /* If the log was not defined by an environment variable, early out */ if (sandbox_perf_log == NULL) return; - uint64_t queued_duration = sandbox->timestamp_of.allocation - sandbox->timestamp_of.request_arrival; + uint64_t queued_duration = sandbox->timestamp_of.dispatched - sandbox->timestamp_of.allocation; /* * Assumption: A sandbox is never able to free pages. If linear memory management diff --git a/runtime/include/sandbox_set_as_allocated.h b/runtime/include/sandbox_set_as_allocated.h index f77b215..89a5fe7 100644 --- a/runtime/include/sandbox_set_as_allocated.h +++ b/runtime/include/sandbox_set_as_allocated.h @@ -24,6 +24,7 @@ sandbox_set_as_allocated(struct sandbox *sandbox) /* State Change Bookkeeping */ assert(now > sandbox->timestamp_of.last_state_change); + sandbox->timestamp_of.allocation = now; sandbox->timestamp_of.last_state_change = now; sandbox_state_history_init(&sandbox->state_history); sandbox_state_history_append(&sandbox->state_history, SANDBOX_ALLOCATED); diff --git a/runtime/include/sandbox_set_as_error.h b/runtime/include/sandbox_set_as_error.h index 326eba9..4525264 100644 --- a/runtime/include/sandbox_set_as_error.h +++ b/runtime/include/sandbox_set_as_error.h @@ -4,6 +4,7 @@ #include #include "arch/getcycles.h" +#include "listener_thread.h" #include "local_runqueue.h" #include "sandbox_state.h" #include "sandbox_functions.h" @@ -36,8 +37,6 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state) case SANDBOX_RUNNING_SYS: { local_runqueue_delete(sandbox); sandbox_free_linear_memory(sandbox); - http_session_free(sandbox->http); - sandbox->http = NULL; break; } default: { @@ -58,6 +57,12 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state) /* Admissions Control Post Processing */ admissions_control_subtract(sandbox->admissions_estimate); + /* Return HTTP session to listener core to be written back to client */ + http_session_set_response_header(sandbox->http, 500, NULL, 0); + sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session); + sandbox->http = NULL; + /* Terminal State Logging */ sandbox_perf_log_print_entry(sandbox); sandbox_summarize_page_allocations(sandbox); diff --git a/runtime/include/sandbox_set_as_returned.h b/runtime/include/sandbox_set_as_returned.h index 4032e39..88cc472 100644 --- a/runtime/include/sandbox_set_as_returned.h +++ b/runtime/include/sandbox_set_as_returned.h @@ -4,6 +4,7 @@ #include #include "arch/getcycles.h" +#include "listener_thread.h" #include "local_runqueue.h" #include "panic.h" #include "sandbox_functions.h" @@ -30,7 +31,6 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state) switch (last_state) { case SANDBOX_RUNNING_SYS: { - sandbox->timestamp_of.response = now; local_runqueue_delete(sandbox); sandbox_free_linear_memory(sandbox); break; @@ -50,6 +50,12 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state) sandbox_state_totals_increment(SANDBOX_RETURNED); sandbox_state_totals_decrement(last_state); + http_session_set_response_header(sandbox->http, 200, sandbox->route->response_content_type, + sandbox->http->response_buffer.length); + sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session); + sandbox->http = NULL; + /* State Change Hooks */ sandbox_state_transition_from_hook(sandbox, last_state); sandbox_state_transition_to_hook(sandbox, SANDBOX_RETURNED); diff --git a/runtime/include/sandbox_set_as_runnable.h b/runtime/include/sandbox_set_as_runnable.h index 8e28875..6b404d5 100644 --- a/runtime/include/sandbox_set_as_runnable.h +++ b/runtime/include/sandbox_set_as_runnable.h @@ -30,6 +30,7 @@ sandbox_set_as_runnable(struct sandbox *sandbox, sandbox_state_t last_state) switch (last_state) { case SANDBOX_INITIALIZED: { + sandbox->timestamp_of.dispatched = now; local_runqueue_add(sandbox); break; } diff --git a/runtime/include/sandbox_types.h b/runtime/include/sandbox_types.h index f9f2ca1..796c4b3 100644 --- a/runtime/include/sandbox_types.h +++ b/runtime/include/sandbox_types.h @@ -21,9 +21,8 @@ struct sandbox_timestamps { uint64_t last_state_change; /* Used for bookkeeping of actual execution time */ - uint64_t request_arrival; /* Timestamp when request is received */ uint64_t allocation; /* Timestamp when sandbox is allocated */ - uint64_t response; /* Timestamp when response is sent */ + uint64_t dispatched; /* Timestamp when a sandbox is first added to a worker's runqueue */ uint64_t completion; /* Timestamp when sandbox runs to completion */ #ifdef LOG_SANDBOX_MEMORY_PROFILE uint32_t page_allocations[SANDBOX_PAGE_ALLOCATION_TIMESTAMP_COUNT]; diff --git a/runtime/include/scheduler_execute_epoll_loop.h b/runtime/include/scheduler_execute_epoll_loop.h index 5695afc..3e35b9e 100644 --- a/runtime/include/scheduler_execute_epoll_loop.h +++ b/runtime/include/scheduler_execute_epoll_loop.h @@ -60,8 +60,6 @@ scheduler_execute_epoll_loop(void) case SANDBOX_ERROR: panic("Expected to have closed socket"); default: - http_session_send_err_oneshot(sandbox->http, 503); - http_session_close(sandbox->http); sandbox_set_as_error(sandbox, sandbox->state); } } else { diff --git a/runtime/include/tcp_session.h b/runtime/include/tcp_session.h index 6aa9bce..8baab85 100644 --- a/runtime/include/tcp_session.h +++ b/runtime/include/tcp_session.h @@ -30,55 +30,57 @@ tcp_session_close(int client_socket, struct sockaddr *client_address) } } -typedef void (*void_cb)(void); +typedef void (*void_star_cb)(void *); /** * Writes buffer to the client socket * @param client_socket - the client * @param buffer - buffer to write to socket * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out - * @returns 0 on success, -1 on error. + * @returns nwritten on success, -errno, -EAGAIN on block */ -static inline int -tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain) +static inline ssize_t +tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr) { - int rc; + assert(buffer != NULL); + assert(buffer_len > 0); - size_t cursor = 0; - - while (cursor < buffer_len) { - ssize_t sent = write(client_socket, &buffer[cursor], buffer_len - cursor); - if (sent < 0) { - if (errno == EAGAIN) { - if (on_eagain == NULL) { - rc = -1; - goto done; - } - on_eagain(); - } else { - debuglog("Error sending to client: %s", strerror(errno)); - rc = -1; - goto done; - } + ssize_t sent = write(client_socket, buffer, buffer_len); + if (sent < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (on_eagain != NULL) on_eagain(dataptr); + return -EAGAIN; + } else { + return -errno; } + } - assert(sent > 0); - cursor += (size_t)sent; - }; - - rc = 0; -done: - return rc; + return sent; } /** - * Rejects request due to admission control or error - * @param client_socket - the client we are rejecting - * @param buffer - buffer to write to socket - * @returns 0 + * Writes buffer to the client socket + * @param client_socket - the client + * @param buffer - buffer to reach the socket into + * @param buffer_len - buffer to reach the socket into + * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out + * @returns nwritten on success, -errno on error, -eagain on block */ -static inline int -tcp_session_send_oneshot(int client_socket, const char *buffer, size_t buffer_len) +static inline ssize_t +tcp_session_recv(int client_socket, char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr) { - return tcp_session_send(client_socket, buffer, buffer_len, NULL); + assert(buffer != NULL); + assert(buffer_len > 0); + + ssize_t received = read(client_socket, buffer, buffer_len); + if (received < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (on_eagain != NULL) on_eagain(dataptr); + return -EAGAIN; + } else { + return -errno; + } + } + + return received; } diff --git a/runtime/include/wasm_stack.h b/runtime/include/wasm_stack.h index 1821172..95d5ac3 100644 --- a/runtime/include/wasm_stack.h +++ b/runtime/include/wasm_stack.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "ps_list.h" #include "types.h" @@ -124,6 +125,6 @@ wasm_stack_reinit(struct wasm_stack *wasm_stack) assert(wasm_stack->low == wasm_stack->buffer + /* guard page */ PAGE_SIZE); assert(wasm_stack->high == wasm_stack->low + wasm_stack->capacity); - memset(wasm_stack->low, 0, wasm_stack->capacity); + explicit_bzero(wasm_stack->low, wasm_stack->capacity); ps_list_init_d(wasm_stack); } diff --git a/runtime/src/current_sandbox.c b/runtime/src/current_sandbox.c index a0ca524..1d3f43e 100644 --- a/runtime/src/current_sandbox.c +++ b/runtime/src/current_sandbox.c @@ -46,7 +46,7 @@ current_sandbox_sleep() /** * @brief Switches from an executing sandbox to the worker thread base context * - * This places the current sandbox on the completion queue if in RETURNED state + * This places the current sandbox on the completion queue if in RETURNED or RUNNING_SYS state */ void current_sandbox_exit() @@ -81,42 +81,32 @@ current_sandbox_wasm_trap_handler(int trapno) struct sandbox *sandbox = current_sandbox_get(); sandbox_syscall(sandbox); - struct http_session *session = sandbox->http; - switch (trapno) { case WASM_TRAP_INVALID_INDEX: error_message = "WebAssembly Trap: Invalid Index\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_MISMATCHED_TYPE: error_message = "WebAssembly Trap: Mismatched Type\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_PROTECTED_CALL_STACK_OVERFLOW: error_message = "WebAssembly Trap: Protected Call Stack Overflow\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_OUT_OF_BOUNDS_LINEAR_MEMORY: error_message = "WebAssembly Trap: Out of Bounds Linear Memory Access\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_ILLEGAL_ARITHMETIC_OPERATION: error_message = "WebAssembly Trap: Illegal Arithmetic Operation\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; case WASM_TRAP_UNREACHABLE: error_message = "WebAssembly Trap: Unreachable Instruction\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; default: error_message = "WebAssembly Trap: Unknown Trapno\n"; - http_session_send_err(session, 500, current_sandbox_sleep); break; } debuglog("%s", error_message); worker_thread_epoll_remove_sandbox(sandbox); - http_session_close(sandbox->http); generic_thread_dump_lock_overhead(); current_sandbox_exit(); assert(0); @@ -165,7 +155,6 @@ current_sandbox_init() err: debuglog("%s", error_message); worker_thread_epoll_remove_sandbox(sandbox); - http_session_close(sandbox->http); generic_thread_dump_lock_overhead(); current_sandbox_exit(); return NULL; @@ -181,24 +170,12 @@ current_sandbox_fini() sandbox_syscall(sandbox); sandbox->timestamp_of.completion = __getcycles(); - sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.request_arrival; - - /* Retrieve the result, construct the HTTP response, and send to client */ - if (http_session_send_response(sandbox->http, sandbox->route->response_content_type, current_sandbox_sleep) - < 0) { - error_message = "Unable to build and send client response\n"; - goto err; - }; - - http_total_increment_2xx(); - - sandbox->timestamp_of.response = __getcycles(); + sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.allocation; assert(sandbox->state == SANDBOX_RUNNING_SYS); worker_thread_epoll_remove_sandbox(sandbox); done: - http_session_close(sandbox->http); sandbox_set_as_returned(sandbox, SANDBOX_RUNNING_SYS); /* Cleanup connection and exit sandbox */ diff --git a/runtime/src/listener_thread.c b/runtime/src/listener_thread.c index aee05ce..4553487 100644 --- a/runtime/src/listener_thread.c +++ b/runtime/src/listener_thread.c @@ -12,6 +12,18 @@ #include "tenant.h" #include "tenant_functions.h" +static void listener_thread_unregister_http_session(struct http_session *http); +static void panic_on_epoll_error(struct epoll_event *evt); + +static void on_client_socket_epoll_event(struct epoll_event *evt); +static void on_tenant_socket_epoll_event(struct epoll_event *evt); +static void on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant); +static void on_client_request_receiving(struct http_session *session); +static void on_client_request_received(struct http_session *session); +static void on_client_response_header_sending(struct http_session *session); +static void on_client_response_body_sending(struct http_session *session); +static void on_client_response_sent(struct http_session *session); + /* * Descriptor of the epoll instance used to monitor the socket descriptors of registered * serverless modules. The listener cores listens for incoming client requests through this. @@ -61,15 +73,32 @@ listener_thread_register_http_session(struct http_session *http) int rc = 0; struct epoll_event accept_evt; accept_evt.data.ptr = (void *)http; - accept_evt.events = EPOLLIN; - epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt); + switch (http->state) { + case HTTP_SESSION_RECEIVING_REQUEST: + accept_evt.events = EPOLLIN; + http->state = HTTP_SESSION_RECEIVE_REQUEST_BLOCKED; + break; + case HTTP_SESSION_SENDING_RESPONSE_HEADER: + accept_evt.events = EPOLLOUT; + http->state = HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED; + break; + case HTTP_SESSION_SENDING_RESPONSE: + accept_evt.events = EPOLLOUT; + http->state = HTTP_SESSION_SEND_RESPONSE_BLOCKED; + break; + default: + panic("Invalid HTTP Session State: %d\n", http->state); + } + + rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt); + if (rc != 0) { panic("Failed to add http session to listener thread epoll\n"); } } /** * @brief Registers a serverless tenant on the listener thread's epoll descriptor **/ -void +static void listener_thread_unregister_http_session(struct http_session *http) { assert(http != NULL); @@ -78,11 +107,13 @@ listener_thread_unregister_http_session(struct http_session *http) panic("Attempting to unregister an http session before listener thread initialization"); } - epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL); + int rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL); + if (rc != 0) { panic("Failed to remove http session from listener thread epoll\n"); } } /** * @brief Registers a serverless tenant on the listener thread's epoll descriptor + * Assumption: We never have to unregister a tenant **/ int listener_thread_register_tenant(struct tenant *tenant) @@ -117,121 +148,68 @@ panic_on_epoll_error(struct epoll_event *evt) } static void -handle_tcp_requests(struct epoll_event *evt) +on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant) { - assert((evt->events & EPOLLIN) == EPOLLIN); + uint64_t request_arrival_timestamp = __getcycles(); - /* Unpack tenant from epoll event */ - struct tenant *tenant = (struct tenant *)evt->data.ptr; - assert(tenant); - - /* Accept Client Request as a nonblocking socket, saving address information */ - struct sockaddr_in client_address; - socklen_t address_length = sizeof(client_address); + http_total_increment_request(); - /* Accept as many requests as possible, returning when we would have blocked */ - while (true) { - int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address, - &address_length, SOCK_NONBLOCK); - if (unlikely(client_socket < 0)) { - if (errno == EWOULDBLOCK || errno == EAGAIN) return; - - panic("accept4: %s", strerror(errno)); - } - - uint64_t request_arrival_timestamp = __getcycles(); - - http_total_increment_request(); - - /* Allocate HTTP Session */ - struct http_session *session = http_session_alloc(client_socket, - (const struct sockaddr *)&client_address, tenant, - request_arrival_timestamp); - - /* Receive HTTP Request */ - int rc = http_session_receive_request(session, listener_thread_register_http_session); - if (rc == -3) { - continue; - } else if (rc == -2) { - debuglog("Request size exceeded Buffer\n"); - http_session_send_err_oneshot(session, 413); - continue; - } else if (rc == -1) { - http_session_send_err_oneshot(session, 400); - continue; - } - - assert(session->http_request.message_end); - - /* Route to sandbox */ - struct route *route = http_router_match_route(&tenant->router, session->http_request.full_url); - if (route == NULL) { - http_session_send_err_oneshot(session, 404); - continue; - } - - /* - * Perform admissions control. - * If 0, workload was rejected, so close with 429 "Too Many Requests" and continue - * TODO: Consider providing a Retry-After header - */ - uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); - if (work_admitted == 0) { - http_session_send_err_oneshot(session, 429); - continue; - } - - /* Allocate a Sandbox */ - struct sandbox *sandbox = sandbox_alloc(route->module, session, route, tenant, work_admitted); - if (unlikely(sandbox == NULL)) { - http_session_send_err_oneshot(session, 503); - continue; - } - - /* If the global request scheduler is full, return a 429 to the client - */ - sandbox = global_request_scheduler_add(sandbox); - if (unlikely(sandbox == NULL)) { - http_session_send_err_oneshot(session, 429); - sandbox_free(sandbox); - continue; - } + /* Allocate HTTP Session */ + struct http_session *session = http_session_alloc(client_socket, (const struct sockaddr *)&client_address, + tenant, request_arrival_timestamp); + if (likely(session != NULL)) { + on_client_request_receiving(session); + return; + } else { + /* Failed to allocate memory */ + debuglog("Failed to allocate http session\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 500, NULL, 0); + on_client_response_header_sending(session); + return; } } static void -resume_blocked_read(struct epoll_event *evt) +on_client_request_receiving(struct http_session *session) { - assert((evt->events & EPOLLIN) == EPOLLIN); - - /* Unpack http session from epoll event */ - struct http_session *session = (struct http_session *)evt->data.ptr; - assert(session); - /* Read HTTP request */ - int rc = http_session_receive_request(session, listener_thread_register_http_session); - if (rc == -3) { + int rc = http_session_receive_request(session, (void_star_cb)listener_thread_register_http_session); + if (likely(rc == 0)) { + on_client_request_received(session); return; - } else if (rc == -2) { - debuglog("Request size exceeded Buffer\n"); - /* Request size exceeded Buffer, send 413 Payload Too Large */ - listener_thread_unregister_http_session(session); - http_session_send_err_oneshot(session, 413); + } else if (unlikely(rc == -EAGAIN)) { + /* session blocked and registered to epoll so continue to next handle */ return; - } else if (rc == -1) { - listener_thread_unregister_http_session(session); - http_session_send_err_oneshot(session, 400); + } else if (unlikely(rc == -ENOMEM)) { + /* Failed to grow request buffer */ + debuglog("Failed to grow http request buffer\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 500, NULL, 0); + on_client_response_header_sending(session); + return; + } else if (rc < 0) { + debuglog("Failed to receive or parse request\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 400, NULL, 0); + on_client_response_header_sending(session); return; } - assert(session->http_request.message_end); + assert(0); +} - /* We read session to completion, so can remove from epoll */ - listener_thread_unregister_http_session(session); +static void +on_client_request_received(struct http_session *session) +{ + assert(session->state == HTTP_SESSION_RECEIVED_REQUEST); struct route *route = http_router_match_route(&session->tenant->router, session->http_request.full_url); if (route == NULL) { - http_session_send_err_oneshot(session, 404); + debuglog("Did not match any routes\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 404, NULL, 0); + on_client_response_header_sending(session); return; } @@ -242,21 +220,132 @@ resume_blocked_read(struct epoll_event *evt) */ uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); if (work_admitted == 0) { - http_session_send_err_oneshot(session, 429); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 429, NULL, 0); + on_client_response_header_sending(session); return; } /* Allocate a Sandbox */ + session->state = HTTP_SESSION_EXECUTING; struct sandbox *sandbox = sandbox_alloc(route->module, session, route, session->tenant, work_admitted); if (unlikely(sandbox == NULL)) { - http_session_send_err_oneshot(session, 503); + debuglog("Failed to allocate sandbox\n"); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 500, NULL, 0); + on_client_response_header_sending(session); return; } /* If the global request scheduler is full, return a 429 to the client */ if (unlikely(global_request_scheduler_add(sandbox) == NULL)) { - http_session_send_err_oneshot(session, 429); + debuglog("Failed to add sandbox to global queue\n"); sandbox_free(sandbox); + session->state = HTTP_SESSION_EXECUTION_COMPLETE; + http_session_set_response_header(session, 429, NULL, 0); + on_client_response_header_sending(session); + } +} + +static void +on_client_response_header_sending(struct http_session *session) +{ + assert(session->state = HTTP_SESSION_EXECUTION_COMPLETE); + session->state = HTTP_SESSION_SENDING_RESPONSE_HEADER; + + int rc = http_session_send_response_header(session, (void_star_cb)listener_thread_register_http_session); + if (likely(rc == 0)) { + on_client_response_body_sending(session); + return; + } else if (unlikely(rc == -EAGAIN)) { + /* session blocked and registered to epoll so continue to next handle */ + return; + } else if (rc < 0) { + http_session_close(session); + http_session_free(session); + return; + } +} + +static void +on_client_response_body_sending(struct http_session *session) +{ + /* Read HTTP request */ + int rc = http_session_send_response_body(session, (void_star_cb)listener_thread_register_http_session); + if (likely(rc == 0)) { + on_client_response_sent(session); + return; + } else if (unlikely(rc == -EAGAIN)) { + /* session blocked and registered to epoll so continue to next handle */ + return; + } else if (unlikely(rc < 0)) { + http_session_close(session); + http_session_free(session); + return; + } +} + +static void +on_client_response_sent(struct http_session *session) +{ + session->response_sent_timestamp = __getcycles(); + + http_session_close(session); + http_session_free(session); + return; +} + +static void +on_tenant_socket_epoll_event(struct epoll_event *evt) +{ + assert((evt->events & EPOLLIN) == EPOLLIN); + + struct tenant *tenant = evt->data.ptr; + assert(tenant); + + /* Accept Client Request as a nonblocking socket, saving address information */ + struct sockaddr_in client_address; + socklen_t address_length = sizeof(client_address); + + /* Accept as many clients requests as possible, returning when we would have blocked */ + while (true) { + int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address, + &address_length, SOCK_NONBLOCK); + if (unlikely(client_socket < 0)) { + if (errno == EWOULDBLOCK || errno == EAGAIN) return; + + panic("accept4: %s", strerror(errno)); + } + + on_client_request_arrival(client_socket, (const struct sockaddr *)&client_address, tenant); + } +} + +static void +on_client_socket_epoll_event(struct epoll_event *evt) +{ + assert(evt); + + struct http_session *session = evt->data.ptr; + assert(session); + + listener_thread_unregister_http_session(session); + + switch (session->state) { + case HTTP_SESSION_RECEIVE_REQUEST_BLOCKED: + assert((evt->events & EPOLLIN) == EPOLLIN); + on_client_request_receiving(session); + break; + case HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED: + assert((evt->events & EPOLLOUT) == EPOLLOUT); + on_client_response_header_sending(session); + break; + case HTTP_SESSION_SEND_RESPONSE_BLOCKED: + assert((evt->events & EPOLLOUT) == EPOLLOUT); + on_client_response_body_sending(session); + break; + default: + panic("Invalid HTTP Session State"); } } @@ -298,9 +387,9 @@ listener_thread_main(void *dummy) panic_on_epoll_error(&epoll_events[i]); if (tenant_database_find_by_ptr(epoll_events[i].data.ptr) != NULL) { - handle_tcp_requests(&epoll_events[i]); + on_tenant_socket_epoll_event(&epoll_events[i]); } else { - resume_blocked_read(&epoll_events[i]); + on_client_socket_epoll_event(&epoll_events[i]); } } generic_thread_dump_lock_overhead(); diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index 3b7b7d5..384777b 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -129,7 +129,6 @@ err_stack_allocation_failed: err_memory_allocation_failed: err_globals_allocation_failed: err_http_allocation_failed: - http_session_send_err_oneshot(sandbox->http, 503); sandbox_set_as_error(sandbox, SANDBOX_ALLOCATED); perror(error_message); rc = -1; @@ -154,8 +153,7 @@ sandbox_init(struct sandbox *sandbox, struct module *module, struct http_session sandbox->tenant = tenant; sandbox->route = route; - sandbox->timestamp_of.request_arrival = session->request_arrival_timestamp; - sandbox->absolute_deadline = session->request_arrival_timestamp + sandbox->route->relative_deadline; + sandbox->absolute_deadline = sandbox->timestamp_of.allocation + sandbox->route->relative_deadline; /* * Admissions Control State @@ -205,13 +203,15 @@ sandbox_deinit(struct sandbox *sandbox) assert(sandbox != current_sandbox_get()); assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE); + /* Assumption: HTTP session was migrated to listener core */ + assert(sandbox->http == NULL); + module_release(sandbox->module); /* Linear Memory and Guard Page should already have been munmaped and set to NULL */ assert(sandbox->memory == NULL); if (likely(sandbox->stack != NULL)) sandbox_free_stack(sandbox); - if (likely(sandbox->http != NULL)) http_session_free(sandbox->http); if (likely(sandbox->globals.buffer != NULL)) sandbox_free_globals(sandbox); if (likely(sandbox->wasi_context != NULL)) wasi_context_destroy(sandbox->wasi_context); } diff --git a/tests/traps/Makefile b/tests/traps/Makefile index e3285b0..c609122 100644 --- a/tests/traps/Makefile +++ b/tests/traps/Makefile @@ -31,6 +31,7 @@ debug: sledgert trap_divzero LD_LIBRARY_PATH=${SLEDGE_BINARY_DIR} gdb ${SLEDGE_BINARY_DIR}/sledgert \ --eval-command="handle SIGUSR1 noprint nostop" \ --eval-command="handle SIGPIPE noprint nostop" \ + --eval-command="handle SIGFPE noprint nostop" \ --eval-command="set pagination off" \ --eval-command="run spec.json" @@ -38,7 +39,7 @@ client-ok: echo "1" | http :10000/divide client-trap: - echo "0" | http :10000/divide + echo "0" | http --timeout 3600 :10000/divide client-trap2: echo "-1" | http :10000/divide