feat: refactor workers for nonblocking sockets

main
Sean McBride 4 years ago
parent a7293a7a0a
commit 0e521668f8

@ -36,7 +36,6 @@ CFLAGS += -DDEBUG
# CFLAGS += -DLOG_PREEMPTION
# CFLAGS += -DLOG_MODULE_LOADING
# CFLAGS += -DLOG_TOTAL_REQS_RESPS
# CFLAGS += -DLOG_EPOLL
# System Configuraiton Flags

@ -80,9 +80,20 @@ listener_thread_reject(int client_socket)
{
assert(client_socket >= 0);
int send_rc = send(client_socket, HTTP_RESPONSE_504_SERVICE_UNAVAILABLE,
strlen(HTTP_RESPONSE_504_SERVICE_UNAVAILABLE), 0);
if (send_rc < 0) goto send_504_err;
int rc;
int sent = 0;
int to_send = strlen(HTTP_RESPONSE_504_SERVICE_UNAVAILABLE);
while (sent < to_send) {
rc = write(client_socket, HTTP_RESPONSE_504_SERVICE_UNAVAILABLE,
strlen(HTTP_RESPONSE_504_SERVICE_UNAVAILABLE));
if (rc < 0) {
if (errno == EAGAIN) continue;
goto send_504_err;
}
sent += rc;
};
#ifdef LOG_TOTAL_REQS_RESPS
runtime_total_5XX_responses++;

@ -51,24 +51,28 @@ sandbox_setup_arguments(struct sandbox *sandbox)
* Run the http-parser on the sandbox's request_response_data using the configured settings global
* @param sandbox the sandbox containing the req_resp data that we want to parse
* @param length The size of the request_response_data that we want to parse
* @returns 0
*
* @returns 0 on success, -1 on failure
*/
int
sandbox_parse_http_request(struct sandbox *sandbox, size_t length)
{
assert(sandbox != NULL);
assert(length > 0);
if (length == 0) return 0;
/* Why is our start address sandbox->request_response_data + sandbox->request_response_data_length?
it's like a cursor to keep track of what we've read so far */
http_parser_execute(&sandbox->http_parser, http_parser_settings_get(),
sandbox->request_response_data + sandbox->request_response_data_length, length);
size_t size_parsed = http_parser_execute(&sandbox->http_parser, http_parser_settings_get(),
sandbox->request_response_data + sandbox->request_response_data_length,
length);
if (size_parsed != length) return -1;
return 0;
}
/**
* Receive and Parse the Request for the current sandbox
* @return 1 on success, < 0 on failure.
* @return 0 if message parsing complete, -EAGAIN if not yet complete, -1 on error
*/
static inline int
sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
@ -76,41 +80,49 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
assert(sandbox != NULL);
assert(sandbox->module->max_request_size > 0);
assert(sandbox->request_response_data_length == 0);
int rc;
#ifndef USE_HTTP_UVIO
int r = 0;
r = recv(sandbox->client_socket_descriptor, sandbox->request_response_data, sandbox->module->max_request_size,
0);
if (r == 0) debuglog("Socket %d returned 0 bytes\n", sandbox->client_socket_descriptor);
if (r < 0) {
debuglog("Error reading socket %d - %s\n", sandbox->client_socket_descriptor, strerror(errno));
return r;
}
while (r > 0) {
if (sandbox_parse_http_request(sandbox, r) != 0) return -1;
sandbox->request_response_data_length += r;
struct http_request *rh = &sandbox->http_request;
if (rh->message_end) break;
r = recv(sandbox->client_socket_descriptor,
(sandbox->request_response_data + sandbox->request_response_data_length),
sandbox->module->max_request_size - sandbox->request_response_data_length, 0);
if (r < 0) {
do {
/* Read from the Socket */
rc = read(sandbox->client_socket_descriptor, sandbox->request_response_data,
sandbox->module->max_request_size);
if (rc < 0) {
if (errno == EAGAIN) continue;
debuglog("Error reading socket %d - %s\n", sandbox->client_socket_descriptor, strerror(errno));
return r;
goto err;
}
}
/* Parse what we've read */
if (sandbox_parse_http_request(sandbox, rc) == -1) {
debuglog("Error parsing socket %d\n", sandbox->client_socket_descriptor);
goto err;
}
sandbox->request_response_data_length += rc;
} while (rc > 0);
#else
int r = uv_read_start((uv_stream_t *)&sandbox->client_libuv_stream,
libuv_callbacks_on_allocate_setup_request_response_data,
libuv_callbacks_on_read_parse_http_request);
rc = uv_read_start((uv_stream_t *)&sandbox->client_libuv_stream,
libuv_callbacks_on_allocate_setup_request_response_data,
libuv_callbacks_on_read_parse_http_request);
worker_thread_process_io();
#endif
if (sandbox->request_response_data_length == 0) {
debuglog("request_response_data_length was unexpectedly 0\n");
return 0;
}
if (!sandbox->http_request.message_end) goto eagain;
sandbox->request_length = sandbox->request_response_data_length;
return 1;
rc = 0;
done:
return rc;
eagain:
rc = -EAGAIN;
goto done;
err:
rc = -1;
goto done;
}
/**
@ -194,19 +206,19 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox)
uint64_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz;
#ifndef USE_HTTP_UVIO
int r = send(sandbox->client_socket_descriptor, sandbox->request_response_data, response_cursor, 0);
if (r < 0) {
perror("send");
return -1;
}
while (r < response_cursor) {
int s = send(sandbox->client_socket_descriptor, sandbox->request_response_data + r, response_cursor - r,
0);
if (s < 0) {
perror("send");
int rc;
int sent = 0;
while (sent < response_cursor) {
rc = write(sandbox->client_socket_descriptor, sandbox->request_response_data + sent,
response_cursor - sent);
if (rc < 0) {
if (errno == EAGAIN) continue;
perror("write");
return -1;
}
r += s;
sent += rc;
}
#else
uv_write_t req = {
@ -287,6 +299,7 @@ current_sandbox_main(void)
assert(sandbox != NULL);
assert(sandbox->state == SANDBOX_RUNNING);
int rc;
char *error_message = "";
assert(!software_interrupt_is_enabled());
@ -297,9 +310,13 @@ current_sandbox_main(void)
sandbox_open_http(sandbox);
/* Parse the request. 1 = Success */
int rc = sandbox_receive_and_parse_client_request(sandbox);
if (rc != 1) {
/* Parse the request. Treat EAGAIN as an error after three retries*/
int tries = 0;
do {
rc = sandbox_receive_and_parse_client_request(sandbox);
} while (rc == -EAGAIN && tries < 3);
if (rc < 0) {
error_message = "Unable to receive and parse client request\n";
goto err;
};
@ -349,8 +366,21 @@ done:
err:
fprintf(stderr, "%s", error_message);
assert(sandbox->state == SANDBOX_RUNNING);
send(sandbox->client_socket_descriptor, HTTP_RESPONSE_400_BAD_REQUEST, strlen(HTTP_RESPONSE_400_BAD_REQUEST),
0);
int to_send = strlen(HTTP_RESPONSE_400_BAD_REQUEST);
int sent = 0;
while (sent < to_send) {
rc = write(sandbox->client_socket_descriptor, HTTP_RESPONSE_400_BAD_REQUEST,
strlen(HTTP_RESPONSE_400_BAD_REQUEST));
if (rc < 0) {
if (errno == EAGAIN) continue;
debuglog("Failed to send 400: %s", strerror(errno));
if (close(sandbox->client_socket_descriptor) < 0) panic("Failed to close socket!\n");
}
sent += rc;
}
#ifdef LOG_TOTAL_REQS_RESPS
runtime_total_4XX_responses++;

Loading…
Cancel
Save