refactor: no http_session in sandbox lifetime.

master
Sean McBride 3 years ago
parent b26f2ca597
commit bca75a9dd4

@ -13,17 +13,12 @@
#define HTTP_MAX_QUERY_PARAM_COUNT 16 #define HTTP_MAX_QUERY_PARAM_COUNT 16
#define HTTP_MAX_QUERY_PARAM_LENGTH 32 #define HTTP_MAX_QUERY_PARAM_LENGTH 32
#define HTTP_RESPONSE_200_TEMPLATE \ #define HTTP_RESPONSE_200_OK \
"HTTP/1.1 200 OK\r\n" \ "HTTP/1.1 200 OK\r\n" \
"Server: SLEdge\r\n" \ "Server: SLEdge\r\n" \
"Connection: close\r\n" \ "Connection: close\r\n" \
"Content-Type: %s\r\n" \
"Content-Length: %lu\r\n" \
"\r\n" "\r\n"
/* The sum of format specifier characters in the template above */
#define HTTP_RESPONSE_200_TEMPLATE_FORMAT_SPECIFIER_LENGTH 5
#define HTTP_RESPONSE_400_BAD_REQUEST \ #define HTTP_RESPONSE_400_BAD_REQUEST \
"HTTP/1.1 400 Bad Request\r\n" \ "HTTP/1.1 400 Bad Request\r\n" \
"Server: SLEdge\r\n" \ "Server: SLEdge\r\n" \
@ -60,18 +55,16 @@
"Connection: close\r\n" \ "Connection: close\r\n" \
"\r\n" "\r\n"
static inline int
http_header_200_write(int fd, const char *content_type, size_t content_length)
{
return dprintf(fd, HTTP_RESPONSE_200_TEMPLATE, content_type, content_length);
}
static inline const char * static inline const char *
http_header_build(int status_code) http_header_build(int status_code)
{ {
const char *response; const char *response;
int rc; int rc;
switch (status_code) { switch (status_code) {
case 200:
response = HTTP_RESPONSE_200_OK;
http_total_increment_2XX();
break;
case 400: case 400:
response = HTTP_RESPONSE_400_BAD_REQUEST; response = HTTP_RESPONSE_400_BAD_REQUEST;
http_total_increment_4XX(); http_total_increment_4XX();
@ -103,10 +96,12 @@ http_header_build(int status_code)
return response; return response;
} }
static inline int static inline size_t
http_header_len(int status_code) http_header_len(int status_code)
{ {
switch (status_code) { switch (status_code) {
case 200:
return strlen(HTTP_RESPONSE_200_OK);
case 400: case 400:
return strlen(HTTP_RESPONSE_400_BAD_REQUEST); return strlen(HTTP_RESPONSE_400_BAD_REQUEST);
case 404: case 404:

@ -22,19 +22,54 @@
#define u8 uint8_t #define u8 uint8_t
VEC(u8) VEC(u8)
enum http_session_state
{
HTTP_SESSION_UNINITIALIZED = 0,
HTTP_SESSION_INITIALIZED,
HTTP_SESSION_RECEIVING_REQUEST,
HTTP_SESSION_RECEIVE_REQUEST_BLOCKED,
HTTP_SESSION_RECEIVED_REQUEST,
HTTP_SESSION_EXECUTING,
HTTP_SESSION_EXECUTION_COMPLETE,
HTTP_SESSION_SENDING_RESPONSE_HEADER,
HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED,
HTTP_SESSION_SENDING_RESPONSE,
HTTP_SESSION_SEND_RESPONSE_BLOCKED,
HTTP_SESSION_SENT_RESPONSE
};
struct http_session { struct http_session {
struct sockaddr client_address; /* client requesting connection! */ enum http_session_state state;
int socket; struct sockaddr client_address; /* client requesting connection! */
struct http_parser http_parser; int socket;
struct http_request http_request; struct http_parser http_parser;
struct vec_u8 request_buffer; struct http_request http_request;
struct vec_u8 response_buffer; struct vec_u8 request_buffer;
struct tenant *tenant; /* Backlink required when read blocks on listener core */ const char *response_header;
uint64_t request_arrival_timestamp; size_t response_header_length;
size_t response_header_written;
struct vec_u8 response_buffer;
size_t response_buffer_written;
struct tenant *tenant; /* Backlink required when read blocks on listener core */
uint64_t request_arrival_timestamp;
}; };
/** /**
* @param session sandbox that we want to init * Initalize state associated with an http parser
* Because the http_parser structure uses pointers to the request buffer, if realloc moves the request
* buffer, this should be called to clear stale state to force parsing to restart
*/
static inline void
http_session_parser_init(struct http_session *session)
{
memset(&session->http_request, 0, sizeof(struct http_request));
http_parser_init(&session->http_parser, HTTP_REQUEST);
/* Set the session as the data the http-parser has access to */
session->http_parser.data = &session->http_request;
}
/**
* @param session session that we want to init
* @returns 0 on success, -1 on error * @returns 0 on success, -1 on error
*/ */
static inline int static inline int
@ -42,6 +77,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str
struct tenant *tenant, uint64_t request_arrival_timestamp) struct tenant *tenant, uint64_t request_arrival_timestamp)
{ {
assert(session != NULL); assert(session != NULL);
assert(session->state == HTTP_SESSION_UNINITIALIZED);
assert(socket_descriptor >= 0); assert(socket_descriptor >= 0);
assert(socket_address != NULL); assert(socket_address != NULL);
@ -50,12 +86,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str
session->request_arrival_timestamp = request_arrival_timestamp; session->request_arrival_timestamp = request_arrival_timestamp;
memcpy(&session->client_address, socket_address, sizeof(struct sockaddr)); memcpy(&session->client_address, socket_address, sizeof(struct sockaddr));
http_parser_init(&session->http_parser, HTTP_REQUEST); http_session_parser_init(session);
/* Set the http_request member as the data pointer the http_parser callbacks receive */
session->http_parser.data = &session->http_request;
memset(&session->http_request, 0, sizeof(struct http_request));
int rc = vec_u8_init(&session->request_buffer, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE); int rc = vec_u8_init(&session->request_buffer, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE);
if (rc < 0) return -1; if (rc < 0) return -1;
@ -63,6 +94,8 @@ http_session_init(struct http_session *session, int socket_descriptor, const str
/* Defer allocating response until we've matched a route */ /* Defer allocating response until we've matched a route */
session->response_buffer.buffer = NULL; session->response_buffer.buffer = NULL;
session->state = HTTP_SESSION_INITIALIZED;
return 0; return 0;
} }
@ -123,22 +156,20 @@ http_session_free(struct http_session *session)
} }
/** /**
* Writes buffer to the client socket * Set Response Header
* @param session - the HTTP session we want to send a 500 to * @param session - the HTTP session we want to set the response header of
* @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out * @param status_code
* @returns 0 on success, -1 on error.
*/ */
static inline int static inline void
http_session_send_err(struct http_session *session, int status_code, void_cb on_eagain) http_session_set_response_header(struct http_session *session, int status_code)
{ {
assert(session != NULL); assert(session != NULL);
assert(status_code >= 100 && status_code <= 599); assert(status_code >= 100 && status_code <= 599);
return tcp_session_send(session->socket, http_header_build(status_code), http_header_len(status_code), session->response_header = http_header_build(status_code);
on_eagain); session->response_header_length = http_header_len(status_code);
} }
static inline void static inline void
http_session_close(struct http_session *session) http_session_close(struct http_session *session)
{ {
@ -148,52 +179,58 @@ http_session_close(struct http_session *session)
} }
/** /**
* Writes an HTTP error code to the TCP socket associated with the session * Writes an HTTP header to the client
* Closes without writing if TCP socket would have blocked * @param client_socket - the client
* Also cleans up the HTTP session * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out
* @returns 0 on success, -1 on error, -2 unused, -3 on eagain
*/ */
static inline int static inline int
http_session_send_err_oneshot(struct http_session *session, int status_code) http_session_send_response_header(struct http_session *session, void_star_cb on_eagain)
{ {
assert(session != NULL); assert(session != NULL);
assert(status_code >= 100 && status_code <= 599);
int rc = tcp_session_send_oneshot(session->socket, http_header_build(status_code), while (session->response_header_length > session->response_header_written) {
http_header_len(status_code)); ssize_t sent =
http_session_close(session); tcp_session_send(session->socket,
http_session_free(session); (const char *)&session->response_header[session->response_header_written],
session->response_header_length - session->response_header_written, on_eagain,
session);
if (sent < 0) {
return (int)sent;
} else {
session->response_header_written += (size_t)sent;
}
}
return rc; return 0;
} }
/**
* Writes an HTTP body to the client
* @param client_socket - the client
* @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out
* @returns 0 on success, -1 on error, -2 unused, -3 on eagain
*/
static inline int static inline int
http_session_send_response(struct http_session *session, const char *response_content_type, void_cb on_eagain) http_session_send_response_body(struct http_session *session, void_star_cb on_eagain)
{ {
assert(session != NULL); assert(session != NULL);
assert(session->response_buffer.buffer != NULL); assert(session->response_buffer.buffer != NULL);
assert(response_content_type != NULL);
int rc = 0;
struct vec_u8 *response_buffer = &session->response_buffer;
/* Send HTTP Response Header and Body */
rc = http_header_200_write(session->socket, response_content_type, response_buffer->length);
if (rc < 0) goto err;
rc = tcp_session_send(session->socket, (const char *)response_buffer->buffer, response_buffer->length, while (session->response_buffer_written < session->response_buffer.length) {
on_eagain); ssize_t sent =
if (rc < 0) goto err; tcp_session_send(session->socket,
(const char *)&session->response_buffer.buffer[session->response_buffer_written],
http_total_increment_2xx(); session->response_buffer.length - session->response_buffer_written, on_eagain,
rc = 0; session);
if (sent < 0) {
return (int)sent;
} else {
session->response_buffer_written += (size_t)sent;
}
}
done: return 0;
return rc;
err:
debuglog("Error sending to client: %s", strerror(errno));
rc = -1;
goto done;
} }
static inline bool static inline bool
@ -202,20 +239,6 @@ http_session_request_buffer_is_full(struct http_session *session)
return session->request_buffer.length == session->request_buffer.capacity; return session->request_buffer.length == session->request_buffer.capacity;
} }
/**
* Initalize state associated with an http parser
* Because the http_parser structure uses pointers to the request buffer, if realloc moves the request
* buffer, this should be called to clear stale state to force parsing to restart
*/
static inline void
http_session_parser_init(struct http_session *session)
{
memset(&session->http_request, 0, sizeof(struct http_request));
http_parser_init(&session->http_parser, HTTP_REQUEST);
/* Set the session as the data the http-parser has access to */
session->http_parser.data = &session->http_request;
}
static inline int static inline int
http_session_request_buffer_grow(struct http_session *session) http_session_request_buffer_grow(struct http_session *session)
{ {
@ -249,10 +272,11 @@ http_session_request_buffer_resize(struct http_session *session, int required_si
return 0; return 0;
} }
typedef void (*http_session_receive_request_egain_cb)(struct http_session *); typedef void (*http_session_cb)(struct http_session *);
/* TODO: Why is this not in TCP logic? */
static inline ssize_t static inline ssize_t
http_session_receive_raw(struct http_session *session, http_session_receive_request_egain_cb on_eagain) http_session_receive_raw(struct http_session *session, void_star_cb on_eagain)
{ {
assert(session->request_buffer.capacity > session->request_buffer.length); assert(session->request_buffer.capacity > session->request_buffer.length);
@ -333,18 +357,19 @@ http_session_log_query_params(struct http_session *session)
/** /**
* Receive and Parse the Request for the current sandbox * Receive and Parse the Request for the current sandbox
* @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space * @return 0 if message parsing complete, -1 on error, -2 if buffers run out of space, -3 EAGAIN
*/ */
static inline int static inline int
http_session_receive_request(struct http_session *session, http_session_receive_request_egain_cb on_eagain) http_session_receive_request(struct http_session *session, void_star_cb on_eagain)
{ {
assert(session != NULL); assert(session != NULL);
assert(session->request_buffer.capacity > 0); assert(session->request_buffer.capacity > 0);
assert(session->request_buffer.length <= session->request_buffer.capacity); assert(session->request_buffer.length <= session->request_buffer.capacity);
assert(session->state == HTTP_SESSION_INITIALIZED || session->state == HTTP_SESSION_RECEIVE_REQUEST_BLOCKED);
int rc = 0; session->state = HTTP_SESSION_RECEIVING_REQUEST;
http_session_parser_init(session); int rc = 0;
while (!session->http_request.message_end) { while (!session->http_request.message_end) {
/* If we know the header size and content-length, resize exactly. Otherwise double */ /* If we know the header size and content-length, resize exactly. Otherwise double */
@ -361,6 +386,7 @@ http_session_receive_request(struct http_session *session, http_session_receive_
if (rc != 0) goto err_nobufs; if (rc != 0) goto err_nobufs;
} }
/* TODO: Why is this a call to TCP receive? */
ssize_t bytes_received = http_session_receive_raw(session, on_eagain); ssize_t bytes_received = http_session_receive_raw(session, on_eagain);
if (bytes_received == -EAGAIN) goto err_eagain; if (bytes_received == -EAGAIN) goto err_eagain;
if (bytes_received == -1) goto err; if (bytes_received == -1) goto err;
@ -370,6 +396,7 @@ http_session_receive_request(struct http_session *session, http_session_receive_
} }
assert(session->http_request.message_end == true); assert(session->http_request.message_end == true);
session->state = HTTP_SESSION_RECEIVED_REQUEST;
http_session_log_query_params(session); http_session_log_query_params(session);
@ -416,3 +443,28 @@ http_session_write_response(struct http_session *session, const uint8_t *source,
DONE: DONE:
return rc; return rc;
} }
static inline void
http_session_send_response(struct http_session *session, void_star_cb on_eagain)
{
int rc = http_session_send_response_header(session, on_eagain);
if (unlikely(rc == -3)) {
/* session blocked and registered to epoll so continue to next handle */
return;
} else if (unlikely(rc == -1)) {
http_session_close(session);
return;
}
rc = http_session_send_response_body(session, on_eagain);
if (unlikely(rc == -3)) {
/* session blocked and registered to epoll so continue to next handle */
return;
} else if (unlikely(rc == -1)) {
http_session_close(session);
return;
}
http_session_close(session);
http_session_free(session);
}

@ -35,7 +35,7 @@ http_total_increment_request()
} }
static inline void static inline void
http_total_increment_2xx() http_total_increment_2XX()
{ {
#ifdef LOG_TOTAL_REQS_RESPS #ifdef LOG_TOTAL_REQS_RESPS
atomic_fetch_add(&http_total_2XX, 1); atomic_fetch_add(&http_total_2XX, 1);

@ -4,6 +4,7 @@
#include <stdnoreturn.h> #include <stdnoreturn.h>
#include "generic_thread.h" #include "generic_thread.h"
#include "http_session.h"
#include "module.h" #include "module.h"
#define LISTENER_THREAD_CORE_ID 1 #define LISTENER_THREAD_CORE_ID 1
@ -12,6 +13,7 @@ extern pthread_t listener_thread_id;
void listener_thread_initialize(void); void listener_thread_initialize(void);
noreturn void *listener_thread_main(void *dummy); noreturn void *listener_thread_main(void *dummy);
void listener_thread_register_http_session(struct http_session *http);
/** /**
* Used to determine if running in the context of a listener thread * Used to determine if running in the context of a listener thread

@ -4,6 +4,7 @@
#include <stdint.h> #include <stdint.h>
#include "arch/getcycles.h" #include "arch/getcycles.h"
#include "listener_thread.h"
#include "local_completion_queue.h" #include "local_completion_queue.h"
#include "local_runqueue.h" #include "local_runqueue.h"
#include "sandbox_state.h" #include "sandbox_state.h"
@ -37,8 +38,6 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
case SANDBOX_RUNNING_SYS: { case SANDBOX_RUNNING_SYS: {
local_runqueue_delete(sandbox); local_runqueue_delete(sandbox);
sandbox_free_linear_memory(sandbox); sandbox_free_linear_memory(sandbox);
http_session_free(sandbox->http);
sandbox->http = NULL;
break; break;
} }
default: { default: {
@ -59,6 +58,12 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
/* Admissions Control Post Processing */ /* Admissions Control Post Processing */
admissions_control_subtract(sandbox->admissions_estimate); admissions_control_subtract(sandbox->admissions_estimate);
/* Return HTTP session to listener core to be written back to client */
http_session_set_response_header(sandbox->http, 500);
sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session);
sandbox->http = NULL;
/* Terminal State Logging */ /* Terminal State Logging */
sandbox_perf_log_print_entry(sandbox); sandbox_perf_log_print_entry(sandbox);
sandbox_summarize_page_allocations(sandbox); sandbox_summarize_page_allocations(sandbox);

@ -4,6 +4,7 @@
#include <stdint.h> #include <stdint.h>
#include "arch/getcycles.h" #include "arch/getcycles.h"
#include "listener_thread.h"
#include "local_runqueue.h" #include "local_runqueue.h"
#include "panic.h" #include "panic.h"
#include "sandbox_functions.h" #include "sandbox_functions.h"
@ -50,6 +51,11 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox_state_totals_increment(SANDBOX_RETURNED); sandbox_state_totals_increment(SANDBOX_RETURNED);
sandbox_state_totals_decrement(last_state); sandbox_state_totals_decrement(last_state);
http_session_set_response_header(sandbox->http, 200);
sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session);
sandbox->http = NULL;
/* State Change Hooks */ /* State Change Hooks */
sandbox_state_transition_from_hook(sandbox, last_state); sandbox_state_transition_from_hook(sandbox, last_state);
sandbox_state_transition_to_hook(sandbox, SANDBOX_RETURNED); sandbox_state_transition_to_hook(sandbox, SANDBOX_RETURNED);

@ -61,8 +61,6 @@ scheduler_execute_epoll_loop(void)
case SANDBOX_ERROR: case SANDBOX_ERROR:
panic("Expected to have closed socket"); panic("Expected to have closed socket");
default: default:
http_session_send_err_oneshot(sandbox->http, 503);
http_session_close(sandbox->http);
sandbox_set_as_error(sandbox, sandbox->state); sandbox_set_as_error(sandbox, sandbox->state);
} }
} else { } else {

@ -30,55 +30,30 @@ tcp_session_close(int client_socket, struct sockaddr *client_address)
} }
} }
typedef void (*void_cb)(void); typedef void (*void_star_cb)(void *);
/** /**
* Writes buffer to the client socket * Writes buffer to the client socket
* @param client_socket - the client * @param client_socket - the client
* @param buffer - buffer to write to socket * @param buffer - buffer to write to socket
* @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out * @param on_eagain - cb to execute when client socket returns EAGAIN. If NULL, error out
* @returns 0 on success, -1 on error. * @returns nwritten on success, -1 on error, -2 unused, -3 on eagain
*/ */
static inline int static inline ssize_t
tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_cb on_eagain) tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr)
{ {
int rc; assert(buffer != NULL);
assert(buffer_len > 0);
size_t cursor = 0;
ssize_t sent = write(client_socket, buffer, buffer_len);
while (cursor < buffer_len) { if (sent < 0) {
ssize_t sent = write(client_socket, &buffer[cursor], buffer_len - cursor); if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (sent < 0) { if (on_eagain != NULL) on_eagain(dataptr);
if (errno == EAGAIN) { return -3;
if (on_eagain == NULL) { } else {
rc = -1; return -1;
goto done;
}
on_eagain();
} else {
debuglog("Error sending to client: %s", strerror(errno));
rc = -1;
goto done;
}
} }
}
assert(sent > 0); return sent;
cursor += (size_t)sent;
};
rc = 0;
done:
return rc;
}
/**
* Rejects request due to admission control or error
* @param client_socket - the client we are rejecting
* @param buffer - buffer to write to socket
* @returns 0
*/
static inline int
tcp_session_send_oneshot(int client_socket, const char *buffer, size_t buffer_len)
{
return tcp_session_send(client_socket, buffer, buffer_len, NULL);
} }

@ -46,7 +46,7 @@ current_sandbox_sleep()
/** /**
* @brief Switches from an executing sandbox to the worker thread base context * @brief Switches from an executing sandbox to the worker thread base context
* *
* This places the current sandbox on the completion queue if in RETURNED state * This places the current sandbox on the completion queue if in RETURNED or RUNNING_SYS state
*/ */
void void
current_sandbox_exit() current_sandbox_exit()
@ -86,37 +86,29 @@ current_sandbox_wasm_trap_handler(int trapno)
switch (trapno) { switch (trapno) {
case WASM_TRAP_INVALID_INDEX: case WASM_TRAP_INVALID_INDEX:
error_message = "WebAssembly Trap: Invalid Index\n"; error_message = "WebAssembly Trap: Invalid Index\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
case WASM_TRAP_MISMATCHED_TYPE: case WASM_TRAP_MISMATCHED_TYPE:
error_message = "WebAssembly Trap: Mismatched Type\n"; error_message = "WebAssembly Trap: Mismatched Type\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
case WASM_TRAP_PROTECTED_CALL_STACK_OVERFLOW: case WASM_TRAP_PROTECTED_CALL_STACK_OVERFLOW:
error_message = "WebAssembly Trap: Protected Call Stack Overflow\n"; error_message = "WebAssembly Trap: Protected Call Stack Overflow\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
case WASM_TRAP_OUT_OF_BOUNDS_LINEAR_MEMORY: case WASM_TRAP_OUT_OF_BOUNDS_LINEAR_MEMORY:
error_message = "WebAssembly Trap: Out of Bounds Linear Memory Access\n"; error_message = "WebAssembly Trap: Out of Bounds Linear Memory Access\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
case WASM_TRAP_ILLEGAL_ARITHMETIC_OPERATION: case WASM_TRAP_ILLEGAL_ARITHMETIC_OPERATION:
error_message = "WebAssembly Trap: Illegal Arithmetic Operation\n"; error_message = "WebAssembly Trap: Illegal Arithmetic Operation\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
case WASM_TRAP_UNREACHABLE: case WASM_TRAP_UNREACHABLE:
error_message = "WebAssembly Trap: Unreachable Instruction\n"; error_message = "WebAssembly Trap: Unreachable Instruction\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
default: default:
error_message = "WebAssembly Trap: Unknown Trapno\n"; error_message = "WebAssembly Trap: Unknown Trapno\n";
http_session_send_err(session, 500, current_sandbox_sleep);
break; break;
} }
debuglog("%s", error_message); debuglog("%s", error_message);
worker_thread_epoll_remove_sandbox(sandbox); worker_thread_epoll_remove_sandbox(sandbox);
http_session_close(sandbox->http);
generic_thread_dump_lock_overhead(); generic_thread_dump_lock_overhead();
current_sandbox_exit(); current_sandbox_exit();
assert(0); assert(0);
@ -165,7 +157,6 @@ current_sandbox_init()
err: err:
debuglog("%s", error_message); debuglog("%s", error_message);
worker_thread_epoll_remove_sandbox(sandbox); worker_thread_epoll_remove_sandbox(sandbox);
http_session_close(sandbox->http);
generic_thread_dump_lock_overhead(); generic_thread_dump_lock_overhead();
current_sandbox_exit(); current_sandbox_exit();
return NULL; return NULL;
@ -183,22 +174,12 @@ current_sandbox_fini()
sandbox->timestamp_of.completion = __getcycles(); sandbox->timestamp_of.completion = __getcycles();
sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.request_arrival; sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.request_arrival;
/* Retrieve the result, construct the HTTP response, and send to client */
if (http_session_send_response(sandbox->http, sandbox->route->response_content_type, current_sandbox_sleep)
< 0) {
error_message = "Unable to build and send client response\n";
goto err;
};
http_total_increment_2xx();
sandbox->timestamp_of.response = __getcycles(); sandbox->timestamp_of.response = __getcycles();
assert(sandbox->state == SANDBOX_RUNNING_SYS); assert(sandbox->state == SANDBOX_RUNNING_SYS);
worker_thread_epoll_remove_sandbox(sandbox); worker_thread_epoll_remove_sandbox(sandbox);
done: done:
http_session_close(sandbox->http);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING_SYS); sandbox_set_as_returned(sandbox, SANDBOX_RUNNING_SYS);
/* Cleanup connection and exit sandbox */ /* Cleanup connection and exit sandbox */

@ -12,6 +12,18 @@
#include "tenant.h" #include "tenant.h"
#include "tenant_functions.h" #include "tenant_functions.h"
static void listener_thread_unregister_http_session(struct http_session *http);
static void panic_on_epoll_error(struct epoll_event *evt);
static void on_client_socket_epoll_event(struct epoll_event *evt);
static void on_tenant_socket_epoll_event(struct epoll_event *evt);
static void on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant);
static void on_client_request_receiving(struct http_session *session);
static void on_client_request_received(struct http_session *session);
static void on_client_response_header_sending(struct http_session *session);
static void on_client_response_body_sending(struct http_session *session);
static void on_client_response_sent(struct http_session *session);
/* /*
* Descriptor of the epoll instance used to monitor the socket descriptors of registered * Descriptor of the epoll instance used to monitor the socket descriptors of registered
* serverless modules. The listener cores listens for incoming client requests through this. * serverless modules. The listener cores listens for incoming client requests through this.
@ -61,15 +73,32 @@ listener_thread_register_http_session(struct http_session *http)
int rc = 0; int rc = 0;
struct epoll_event accept_evt; struct epoll_event accept_evt;
accept_evt.data.ptr = (void *)http; accept_evt.data.ptr = (void *)http;
accept_evt.events = EPOLLIN;
epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt); switch (http->state) {
case HTTP_SESSION_RECEIVING_REQUEST:
accept_evt.events = EPOLLIN;
http->state = HTTP_SESSION_RECEIVE_REQUEST_BLOCKED;
break;
case HTTP_SESSION_SENDING_RESPONSE_HEADER:
accept_evt.events = EPOLLOUT;
http->state = HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED;
break;
case HTTP_SESSION_SENDING_RESPONSE:
accept_evt.events = EPOLLOUT;
http->state = HTTP_SESSION_SEND_RESPONSE_BLOCKED;
break;
default:
panic("Invalid HTTP Session State: %d\n", http->state);
}
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt);
if (rc != 0) { panic("Failed to add http session to listener thread epoll\n"); }
} }
/** /**
* @brief Registers a serverless tenant on the listener thread's epoll descriptor * @brief Registers a serverless tenant on the listener thread's epoll descriptor
**/ **/
void static void
listener_thread_unregister_http_session(struct http_session *http) listener_thread_unregister_http_session(struct http_session *http)
{ {
assert(http != NULL); assert(http != NULL);
@ -78,11 +107,13 @@ listener_thread_unregister_http_session(struct http_session *http)
panic("Attempting to unregister an http session before listener thread initialization"); panic("Attempting to unregister an http session before listener thread initialization");
} }
epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL); int rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL);
if (rc != 0) { panic("Failed to remove http session from listener thread epoll\n"); }
} }
/** /**
* @brief Registers a serverless tenant on the listener thread's epoll descriptor * @brief Registers a serverless tenant on the listener thread's epoll descriptor
* Assumption: We never have to unregister a tenant
**/ **/
int int
listener_thread_register_tenant(struct tenant *tenant) listener_thread_register_tenant(struct tenant *tenant)
@ -117,121 +148,68 @@ panic_on_epoll_error(struct epoll_event *evt)
} }
static void static void
handle_tcp_requests(struct epoll_event *evt) on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant)
{ {
assert((evt->events & EPOLLIN) == EPOLLIN); uint64_t request_arrival_timestamp = __getcycles();
/* Unpack tenant from epoll event */
struct tenant *tenant = (struct tenant *)evt->data.ptr;
assert(tenant);
/* Accept Client Request as a nonblocking socket, saving address information */
struct sockaddr_in client_address;
socklen_t address_length = sizeof(client_address);
/* Accept as many requests as possible, returning when we would have blocked */ http_total_increment_request();
while (true) {
int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address,
&address_length, SOCK_NONBLOCK);
if (unlikely(client_socket < 0)) {
if (errno == EWOULDBLOCK || errno == EAGAIN) return;
panic("accept4: %s", strerror(errno));
}
uint64_t request_arrival_timestamp = __getcycles();
http_total_increment_request();
/* Allocate HTTP Session */
struct http_session *session = http_session_alloc(client_socket,
(const struct sockaddr *)&client_address, tenant,
request_arrival_timestamp);
/* Receive HTTP Request */
int rc = http_session_receive_request(session, listener_thread_register_http_session);
if (rc == -3) {
continue;
} else if (rc == -2) {
debuglog("Request size exceeded Buffer\n");
http_session_send_err_oneshot(session, 413);
continue;
} else if (rc == -1) {
http_session_send_err_oneshot(session, 400);
continue;
}
assert(session->http_request.message_end);
/* Route to sandbox */
struct route *route = http_router_match_route(&tenant->router, session->http_request.full_url);
if (route == NULL) {
http_session_send_err_oneshot(session, 404);
continue;
}
/* /* Allocate HTTP Session */
* Perform admissions control. struct http_session *session = http_session_alloc(client_socket, (const struct sockaddr *)&client_address,
* If 0, workload was rejected, so close with 429 "Too Many Requests" and continue tenant, request_arrival_timestamp);
* TODO: Consider providing a Retry-After header if (likely(session != NULL)) {
*/ on_client_request_receiving(session);
uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); return;
if (work_admitted == 0) { } else {
http_session_send_err_oneshot(session, 429); /* Failed to allocate memory */
continue; debuglog("Failed to allocate http session\n");
} session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 500);
/* Allocate a Sandbox */ on_client_response_header_sending(session);
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, tenant, work_admitted); return;
if (unlikely(sandbox == NULL)) {
http_session_send_err_oneshot(session, 503);
continue;
}
/* If the global request scheduler is full, return a 429 to the client
*/
sandbox = global_request_scheduler_add(sandbox);
if (unlikely(sandbox == NULL)) {
http_session_send_err_oneshot(session, 429);
sandbox_free(sandbox);
continue;
}
} }
} }
static void static void
resume_blocked_read(struct epoll_event *evt) on_client_request_receiving(struct http_session *session)
{ {
assert((evt->events & EPOLLIN) == EPOLLIN);
/* Unpack http session from epoll event */
struct http_session *session = (struct http_session *)evt->data.ptr;
assert(session);
/* Read HTTP request */ /* Read HTTP request */
int rc = http_session_receive_request(session, listener_thread_register_http_session); int rc = http_session_receive_request(session, (void_star_cb)listener_thread_register_http_session);
if (rc == -3) { if (likely(rc == 0)) {
on_client_request_received(session);
return;
} else if (rc == -3) {
/* session blocked and registered to epoll so continue to next handle */
return; return;
} else if (rc == -2) { } else if (rc == -2) {
debuglog("Request size exceeded Buffer\n"); /* Failed to grow request buffer */
/* Request size exceeded Buffer, send 413 Payload Too Large */ debuglog("Failed to grow http request buffer\n");
listener_thread_unregister_http_session(session); session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_send_err_oneshot(session, 413); http_session_set_response_header(session, 500);
on_client_response_header_sending(session);
return; return;
} else if (rc == -1) { } else if (rc == -1) {
listener_thread_unregister_http_session(session); debuglog("Failed to receive or parse request\n");
http_session_send_err_oneshot(session, 400); session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 400);
on_client_response_header_sending(session);
return; return;
} }
assert(session->http_request.message_end); assert(0);
}
/* We read session to completion, so can remove from epoll */ static void
listener_thread_unregister_http_session(session); on_client_request_received(struct http_session *session)
{
assert(session->state == HTTP_SESSION_RECEIVED_REQUEST);
struct route *route = http_router_match_route(&session->tenant->router, session->http_request.full_url); struct route *route = http_router_match_route(&session->tenant->router, session->http_request.full_url);
if (route == NULL) { if (route == NULL) {
http_session_send_err_oneshot(session, 404); debuglog("Did not match any routes\n");
session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 404);
on_client_response_header_sending(session);
return; return;
} }
@ -242,21 +220,129 @@ resume_blocked_read(struct epoll_event *evt)
*/ */
uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate); uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate);
if (work_admitted == 0) { if (work_admitted == 0) {
http_session_send_err_oneshot(session, 429); session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 429);
on_client_response_header_sending(session);
return; return;
} }
/* Allocate a Sandbox */ /* Allocate a Sandbox */
session->state = HTTP_SESSION_EXECUTING;
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, session->tenant, work_admitted); struct sandbox *sandbox = sandbox_alloc(route->module, session, route, session->tenant, work_admitted);
if (unlikely(sandbox == NULL)) { if (unlikely(sandbox == NULL)) {
http_session_send_err_oneshot(session, 503); debuglog("Failed to allocate sandbox\n");
session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 500);
on_client_response_header_sending(session);
return; return;
} }
/* If the global request scheduler is full, return a 429 to the client */ /* If the global request scheduler is full, return a 429 to the client */
if (unlikely(global_request_scheduler_add(sandbox) == NULL)) { if (unlikely(global_request_scheduler_add(sandbox) == NULL)) {
http_session_send_err_oneshot(session, 429); debuglog("Failed to add sandbox to global queue\n");
sandbox_free(sandbox); sandbox_free(sandbox);
session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 429);
on_client_response_header_sending(session);
}
}
static void
on_client_response_header_sending(struct http_session *session)
{
assert(session->state = HTTP_SESSION_EXECUTION_COMPLETE);
session->state = HTTP_SESSION_SENDING_RESPONSE_HEADER;
int rc = http_session_send_response_header(session, (void_star_cb)listener_thread_register_http_session);
if (likely(rc == 0)) {
on_client_response_body_sending(session);
return;
} else if (rc == -3) {
/* session blocked and registered to epoll so continue to next handle */
return;
} else if (rc == -1) {
http_session_close(session);
return;
}
}
static void
on_client_response_body_sending(struct http_session *session)
{
/* Read HTTP request */
int rc = http_session_send_response_body(session, (void_star_cb)listener_thread_register_http_session);
if (likely(rc == 0)) {
on_client_response_sent(session);
return;
}
if (rc == -3) {
/* session blocked and registered to epoll so continue to next handle */
return;
} else if (rc == -1) {
http_session_close(session);
return;
}
}
static void
on_client_response_sent(struct http_session *session)
{
http_session_close(session);
http_session_free(session);
return;
}
static void
on_tenant_socket_epoll_event(struct epoll_event *evt)
{
assert((evt->events & EPOLLIN) == EPOLLIN);
struct tenant *tenant = evt->data.ptr;
assert(tenant);
/* Accept Client Request as a nonblocking socket, saving address information */
struct sockaddr_in client_address;
socklen_t address_length = sizeof(client_address);
/* Accept as many clients requests as possible, returning when we would have blocked */
while (true) {
int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address,
&address_length, SOCK_NONBLOCK);
if (unlikely(client_socket < 0)) {
if (errno == EWOULDBLOCK || errno == EAGAIN) return;
panic("accept4: %s", strerror(errno));
}
on_client_request_arrival(client_socket, (const struct sockaddr *)&client_address, tenant);
}
}
static void
on_client_socket_epoll_event(struct epoll_event *evt)
{
assert(evt);
struct http_session *session = evt->data.ptr;
assert(session);
listener_thread_unregister_http_session(session);
switch (session->state) {
case HTTP_SESSION_RECEIVE_REQUEST_BLOCKED:
assert((evt->events & EPOLLIN) == EPOLLIN);
on_client_request_receiving(session);
break;
case HTTP_SESSION_SEND_RESPONSE_HEADER_BLOCKED:
assert((evt->events & EPOLLOUT) == EPOLLOUT);
on_client_response_header_sending(session);
break;
case HTTP_SESSION_SEND_RESPONSE_BLOCKED:
assert((evt->events & EPOLLOUT) == EPOLLOUT);
on_client_response_body_sending(session);
break;
default:
panic("Invalid HTTP Session State");
} }
} }
@ -298,9 +384,9 @@ listener_thread_main(void *dummy)
panic_on_epoll_error(&epoll_events[i]); panic_on_epoll_error(&epoll_events[i]);
if (tenant_database_find_by_ptr(epoll_events[i].data.ptr) != NULL) { if (tenant_database_find_by_ptr(epoll_events[i].data.ptr) != NULL) {
handle_tcp_requests(&epoll_events[i]); on_tenant_socket_epoll_event(&epoll_events[i]);
} else { } else {
resume_blocked_read(&epoll_events[i]); on_client_socket_epoll_event(&epoll_events[i]);
} }
} }
generic_thread_dump_lock_overhead(); generic_thread_dump_lock_overhead();

@ -129,7 +129,6 @@ err_stack_allocation_failed:
err_memory_allocation_failed: err_memory_allocation_failed:
err_globals_allocation_failed: err_globals_allocation_failed:
err_http_allocation_failed: err_http_allocation_failed:
http_session_send_err_oneshot(sandbox->http, 503);
sandbox_set_as_error(sandbox, SANDBOX_ALLOCATED); sandbox_set_as_error(sandbox, SANDBOX_ALLOCATED);
perror(error_message); perror(error_message);
rc = -1; rc = -1;
@ -205,13 +204,15 @@ sandbox_deinit(struct sandbox *sandbox)
assert(sandbox != current_sandbox_get()); assert(sandbox != current_sandbox_get());
assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE); assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE);
/* Assumption: HTTP session was migrated to listener core */
assert(sandbox->http == NULL);
module_release(sandbox->module); module_release(sandbox->module);
/* Linear Memory and Guard Page should already have been munmaped and set to NULL */ /* Linear Memory and Guard Page should already have been munmaped and set to NULL */
assert(sandbox->memory == NULL); assert(sandbox->memory == NULL);
if (likely(sandbox->stack != NULL)) sandbox_free_stack(sandbox); if (likely(sandbox->stack != NULL)) sandbox_free_stack(sandbox);
if (likely(sandbox->http != NULL)) http_session_free(sandbox->http);
if (likely(sandbox->globals.buffer != NULL)) sandbox_free_globals(sandbox); if (likely(sandbox->globals.buffer != NULL)) sandbox_free_globals(sandbox);
if (likely(sandbox->wasi_context != NULL)) wasi_context_destroy(sandbox->wasi_context); if (likely(sandbox->wasi_context != NULL)) wasi_context_destroy(sandbox->wasi_context);
} }

Loading…
Cancel
Save