From 0e521668f8cc6bbc30f4d90728250e9bd0f46ae7 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Mon, 17 Aug 2020 22:04:44 -0400 Subject: [PATCH] feat: refactor workers for nonblocking sockets --- runtime/Makefile | 1 - runtime/src/runtime.c | 17 +++++- runtime/src/sandbox.c | 130 ++++++++++++++++++++++++++---------------- 3 files changed, 94 insertions(+), 54 deletions(-) diff --git a/runtime/Makefile b/runtime/Makefile index dfce72d..c83dc17 100644 --- a/runtime/Makefile +++ b/runtime/Makefile @@ -36,7 +36,6 @@ CFLAGS += -DDEBUG # CFLAGS += -DLOG_PREEMPTION # CFLAGS += -DLOG_MODULE_LOADING # CFLAGS += -DLOG_TOTAL_REQS_RESPS -# CFLAGS += -DLOG_EPOLL # System Configuraiton Flags diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index caa364a..34a7a1a 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -80,9 +80,20 @@ listener_thread_reject(int client_socket) { assert(client_socket >= 0); - int send_rc = send(client_socket, HTTP_RESPONSE_504_SERVICE_UNAVAILABLE, - strlen(HTTP_RESPONSE_504_SERVICE_UNAVAILABLE), 0); - if (send_rc < 0) goto send_504_err; + int rc; + int sent = 0; + int to_send = strlen(HTTP_RESPONSE_504_SERVICE_UNAVAILABLE); + + while (sent < to_send) { + rc = write(client_socket, HTTP_RESPONSE_504_SERVICE_UNAVAILABLE, + strlen(HTTP_RESPONSE_504_SERVICE_UNAVAILABLE)); + if (rc < 0) { + if (errno == EAGAIN) continue; + + goto send_504_err; + } + sent += rc; + }; #ifdef LOG_TOTAL_REQS_RESPS runtime_total_5XX_responses++; diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index 1f006ff..8b793a9 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -51,24 +51,28 @@ sandbox_setup_arguments(struct sandbox *sandbox) * Run the http-parser on the sandbox's request_response_data using the configured settings global * @param sandbox the sandbox containing the req_resp data that we want to parse * @param length The size of the request_response_data that we want to parse - * @returns 0 - * + * @returns 0 on success, -1 on failure */ int sandbox_parse_http_request(struct sandbox *sandbox, size_t length) { assert(sandbox != NULL); - assert(length > 0); + + if (length == 0) return 0; + /* Why is our start address sandbox->request_response_data + sandbox->request_response_data_length? it's like a cursor to keep track of what we've read so far */ - http_parser_execute(&sandbox->http_parser, http_parser_settings_get(), - sandbox->request_response_data + sandbox->request_response_data_length, length); + size_t size_parsed = http_parser_execute(&sandbox->http_parser, http_parser_settings_get(), + sandbox->request_response_data + sandbox->request_response_data_length, + length); + + if (size_parsed != length) return -1; return 0; } /** * Receive and Parse the Request for the current sandbox - * @return 1 on success, < 0 on failure. + * @return 0 if message parsing complete, -EAGAIN if not yet complete, -1 on error */ static inline int sandbox_receive_and_parse_client_request(struct sandbox *sandbox) @@ -76,41 +80,49 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox) assert(sandbox != NULL); assert(sandbox->module->max_request_size > 0); assert(sandbox->request_response_data_length == 0); + + int rc; + #ifndef USE_HTTP_UVIO - int r = 0; - r = recv(sandbox->client_socket_descriptor, sandbox->request_response_data, sandbox->module->max_request_size, - 0); - if (r == 0) debuglog("Socket %d returned 0 bytes\n", sandbox->client_socket_descriptor); - if (r < 0) { - debuglog("Error reading socket %d - %s\n", sandbox->client_socket_descriptor, strerror(errno)); - return r; - } - while (r > 0) { - if (sandbox_parse_http_request(sandbox, r) != 0) return -1; - sandbox->request_response_data_length += r; - struct http_request *rh = &sandbox->http_request; - if (rh->message_end) break; - - r = recv(sandbox->client_socket_descriptor, - (sandbox->request_response_data + sandbox->request_response_data_length), - sandbox->module->max_request_size - sandbox->request_response_data_length, 0); - if (r < 0) { + + do { + /* Read from the Socket */ + rc = read(sandbox->client_socket_descriptor, sandbox->request_response_data, + sandbox->module->max_request_size); + if (rc < 0) { + if (errno == EAGAIN) continue; + debuglog("Error reading socket %d - %s\n", sandbox->client_socket_descriptor, strerror(errno)); - return r; + goto err; } - } + + /* Parse what we've read */ + if (sandbox_parse_http_request(sandbox, rc) == -1) { + debuglog("Error parsing socket %d\n", sandbox->client_socket_descriptor); + goto err; + } + sandbox->request_response_data_length += rc; + } while (rc > 0); + #else - int r = uv_read_start((uv_stream_t *)&sandbox->client_libuv_stream, - libuv_callbacks_on_allocate_setup_request_response_data, - libuv_callbacks_on_read_parse_http_request); + rc = uv_read_start((uv_stream_t *)&sandbox->client_libuv_stream, + libuv_callbacks_on_allocate_setup_request_response_data, + libuv_callbacks_on_read_parse_http_request); worker_thread_process_io(); #endif - if (sandbox->request_response_data_length == 0) { - debuglog("request_response_data_length was unexpectedly 0\n"); - return 0; - } + + if (!sandbox->http_request.message_end) goto eagain; + sandbox->request_length = sandbox->request_response_data_length; - return 1; + rc = 0; +done: + return rc; +eagain: + rc = -EAGAIN; + goto done; +err: + rc = -1; + goto done; } /** @@ -194,19 +206,19 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox) uint64_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz; #ifndef USE_HTTP_UVIO - int r = send(sandbox->client_socket_descriptor, sandbox->request_response_data, response_cursor, 0); - if (r < 0) { - perror("send"); - return -1; - } - while (r < response_cursor) { - int s = send(sandbox->client_socket_descriptor, sandbox->request_response_data + r, response_cursor - r, - 0); - if (s < 0) { - perror("send"); + int rc; + int sent = 0; + while (sent < response_cursor) { + rc = write(sandbox->client_socket_descriptor, sandbox->request_response_data + sent, + response_cursor - sent); + if (rc < 0) { + if (errno == EAGAIN) continue; + + perror("write"); return -1; } - r += s; + + sent += rc; } #else uv_write_t req = { @@ -287,6 +299,7 @@ current_sandbox_main(void) assert(sandbox != NULL); assert(sandbox->state == SANDBOX_RUNNING); + int rc; char *error_message = ""; assert(!software_interrupt_is_enabled()); @@ -297,9 +310,13 @@ current_sandbox_main(void) sandbox_open_http(sandbox); - /* Parse the request. 1 = Success */ - int rc = sandbox_receive_and_parse_client_request(sandbox); - if (rc != 1) { + /* Parse the request. Treat EAGAIN as an error after three retries*/ + int tries = 0; + do { + rc = sandbox_receive_and_parse_client_request(sandbox); + } while (rc == -EAGAIN && tries < 3); + + if (rc < 0) { error_message = "Unable to receive and parse client request\n"; goto err; }; @@ -349,8 +366,21 @@ done: err: fprintf(stderr, "%s", error_message); assert(sandbox->state == SANDBOX_RUNNING); - send(sandbox->client_socket_descriptor, HTTP_RESPONSE_400_BAD_REQUEST, strlen(HTTP_RESPONSE_400_BAD_REQUEST), - 0); + + int to_send = strlen(HTTP_RESPONSE_400_BAD_REQUEST); + int sent = 0; + while (sent < to_send) { + rc = write(sandbox->client_socket_descriptor, HTTP_RESPONSE_400_BAD_REQUEST, + strlen(HTTP_RESPONSE_400_BAD_REQUEST)); + if (rc < 0) { + if (errno == EAGAIN) continue; + + debuglog("Failed to send 400: %s", strerror(errno)); + if (close(sandbox->client_socket_descriptor) < 0) panic("Failed to close socket!\n"); + } + + sent += rc; + } #ifdef LOG_TOTAL_REQS_RESPS runtime_total_4XX_responses++;