|
|
|
@ -9,6 +9,8 @@
|
|
|
|
|
#include "runtime.h"
|
|
|
|
|
#include "sandbox_functions.h"
|
|
|
|
|
#include "tcp_session.h"
|
|
|
|
|
#include "tenant.h"
|
|
|
|
|
#include "tenant_functions.h"
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Descriptor of the epoll instance used to monitor the socket descriptors of registered
|
|
|
|
@ -44,6 +46,41 @@ listener_thread_initialize(void)
|
|
|
|
|
printf("\tListener core thread: %lx\n", listener_thread_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief Registers a serverless tenant on the listener thread's epoll descriptor
|
|
|
|
|
**/
|
|
|
|
|
void
|
|
|
|
|
listener_thread_register_http_session(struct http_session *http)
|
|
|
|
|
{
|
|
|
|
|
assert(http != NULL);
|
|
|
|
|
|
|
|
|
|
if (unlikely(listener_thread_epoll_file_descriptor == 0)) {
|
|
|
|
|
panic("Attempting to register an http session before listener thread initialization");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int rc = 0;
|
|
|
|
|
struct epoll_event accept_evt;
|
|
|
|
|
accept_evt.data.ptr = (void *)http;
|
|
|
|
|
accept_evt.events = EPOLLIN;
|
|
|
|
|
|
|
|
|
|
epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, http->socket, &accept_evt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief Registers a serverless tenant on the listener thread's epoll descriptor
|
|
|
|
|
**/
|
|
|
|
|
void
|
|
|
|
|
listener_thread_unregister_http_session(struct http_session *http)
|
|
|
|
|
{
|
|
|
|
|
assert(http != NULL);
|
|
|
|
|
|
|
|
|
|
if (unlikely(listener_thread_epoll_file_descriptor == 0)) {
|
|
|
|
|
panic("Attempting to unregister an http session before listener thread initialization");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_DEL, http->socket, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief Registers a serverless tenant on the listener thread's epoll descriptor
|
|
|
|
|
**/
|
|
|
|
@ -65,6 +102,162 @@ listener_thread_register_tenant(struct tenant *tenant)
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
panic_on_epoll_error(struct epoll_event *evt)
|
|
|
|
|
{
|
|
|
|
|
/* Check Event to determine if epoll returned an error */
|
|
|
|
|
if ((evt->events & EPOLLERR) == EPOLLERR) {
|
|
|
|
|
int error = 0;
|
|
|
|
|
socklen_t errlen = sizeof(error);
|
|
|
|
|
if (getsockopt(evt->data.fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen) == 0) {
|
|
|
|
|
panic("epoll_wait: %s\n", strerror(error));
|
|
|
|
|
}
|
|
|
|
|
panic("epoll_wait");
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
handle_tcp_requests(struct epoll_event *evt)
|
|
|
|
|
{
|
|
|
|
|
assert((evt->events & EPOLLIN) == EPOLLIN);
|
|
|
|
|
|
|
|
|
|
/* Unpack tenant from epoll event */
|
|
|
|
|
struct tenant *tenant = (struct tenant *)evt->data.ptr;
|
|
|
|
|
assert(tenant);
|
|
|
|
|
|
|
|
|
|
/* Accept Client Request as a nonblocking socket, saving address information */
|
|
|
|
|
struct sockaddr_in client_address;
|
|
|
|
|
socklen_t address_length = sizeof(client_address);
|
|
|
|
|
|
|
|
|
|
/* Accept as many requests as possible, returning when we would have blocked */
|
|
|
|
|
while (true) {
|
|
|
|
|
int client_socket = accept4(tenant->tcp_server.socket_descriptor, (struct sockaddr *)&client_address,
|
|
|
|
|
&address_length, SOCK_NONBLOCK);
|
|
|
|
|
if (unlikely(client_socket < 0)) {
|
|
|
|
|
if (errno == EWOULDBLOCK || errno == EAGAIN) return;
|
|
|
|
|
|
|
|
|
|
panic("accept4: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint64_t request_arrival_timestamp = __getcycles();
|
|
|
|
|
|
|
|
|
|
http_total_increment_request();
|
|
|
|
|
|
|
|
|
|
/* Allocate HTTP Session */
|
|
|
|
|
struct http_session *session = http_session_alloc(client_socket,
|
|
|
|
|
(const struct sockaddr *)&client_address, tenant,
|
|
|
|
|
request_arrival_timestamp);
|
|
|
|
|
|
|
|
|
|
/* Receive HTTP Request */
|
|
|
|
|
int rc = http_session_receive_request(session, listener_thread_register_http_session);
|
|
|
|
|
if (rc == -3) {
|
|
|
|
|
continue;
|
|
|
|
|
} else if (rc == -2) {
|
|
|
|
|
debuglog("Request size exceeded Buffer\n");
|
|
|
|
|
http_session_send_err_oneshot(session, 413);
|
|
|
|
|
continue;
|
|
|
|
|
} else if (rc == -1) {
|
|
|
|
|
http_session_send_err_oneshot(session, 400);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Route to sandbox */
|
|
|
|
|
struct route *route = http_router_match_route(&tenant->router, session->http_request.full_url);
|
|
|
|
|
if (route == NULL) {
|
|
|
|
|
http_session_send_err_oneshot(session, 404);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Perform admissions control.
|
|
|
|
|
* If 0, workload was rejected, so close with 429 "Too Many Requests" and continue
|
|
|
|
|
* TODO: Consider providing a Retry-After header
|
|
|
|
|
*/
|
|
|
|
|
uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate);
|
|
|
|
|
if (work_admitted == 0) {
|
|
|
|
|
http_session_send_err_oneshot(session, 429);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Allocate a Sandbox */
|
|
|
|
|
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, tenant, work_admitted);
|
|
|
|
|
if (unlikely(sandbox == NULL)) {
|
|
|
|
|
http_session_send_err_oneshot(session, 503);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* If the global request scheduler is full, return a 429 to the client
|
|
|
|
|
*/
|
|
|
|
|
sandbox = global_request_scheduler_add(sandbox);
|
|
|
|
|
if (unlikely(sandbox == NULL)) {
|
|
|
|
|
http_session_send_err_oneshot(session, 429);
|
|
|
|
|
sandbox_free(sandbox);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
resume_blocked_read(struct epoll_event *evt)
|
|
|
|
|
{
|
|
|
|
|
assert((evt->events & EPOLLIN) == EPOLLIN);
|
|
|
|
|
|
|
|
|
|
/* Unpack http session from epoll event */
|
|
|
|
|
struct http_session *session = (struct http_session *)evt->data.ptr;
|
|
|
|
|
assert(session);
|
|
|
|
|
|
|
|
|
|
/* Read HTTP request */
|
|
|
|
|
int rc = http_session_receive_request(session, listener_thread_register_http_session);
|
|
|
|
|
if (rc == -3) {
|
|
|
|
|
return;
|
|
|
|
|
} else if (rc == -2) {
|
|
|
|
|
debuglog("Request size exceeded Buffer\n");
|
|
|
|
|
/* Request size exceeded Buffer, send 413 Payload Too Large */
|
|
|
|
|
listener_thread_unregister_http_session(session);
|
|
|
|
|
http_session_send_err_oneshot(session, 413);
|
|
|
|
|
return;
|
|
|
|
|
} else if (rc == -1) {
|
|
|
|
|
listener_thread_unregister_http_session(session);
|
|
|
|
|
http_session_send_err_oneshot(session, 400);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* We read session to completion, so can remote from epoll */
|
|
|
|
|
listener_thread_unregister_http_session(session);
|
|
|
|
|
|
|
|
|
|
struct route *route = http_router_match_route(&session->tenant->router, session->http_request.full_url);
|
|
|
|
|
if (route == NULL) {
|
|
|
|
|
http_session_send_err_oneshot(session, 404);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Perform admissions control.
|
|
|
|
|
* If 0, workload was rejected, so close with 429 "Too Many Requests" and continue
|
|
|
|
|
* TODO: Consider providing a Retry-After header
|
|
|
|
|
*/
|
|
|
|
|
uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate);
|
|
|
|
|
if (work_admitted == 0) {
|
|
|
|
|
http_session_send_err_oneshot(session, 429);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Allocate a Sandbox */
|
|
|
|
|
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, session->tenant, work_admitted);
|
|
|
|
|
if (unlikely(sandbox == NULL)) {
|
|
|
|
|
http_session_send_err_oneshot(session, 503);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* If the global request scheduler is full, return a 429 to the client
|
|
|
|
|
*/
|
|
|
|
|
sandbox = global_request_scheduler_add(sandbox);
|
|
|
|
|
if (unlikely(sandbox == NULL)) {
|
|
|
|
|
http_session_send_err_oneshot(session, 429);
|
|
|
|
|
sandbox_free(sandbox);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @brief Execution Loop of the listener core, io_handles HTTP requests, allocates sandbox request objects, and
|
|
|
|
|
* pushes the sandbox object to the global dequeue
|
|
|
|
@ -87,10 +280,7 @@ listener_thread_main(void *dummy)
|
|
|
|
|
pthread_setschedprio(pthread_self(), -20);
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
/*
|
|
|
|
|
* Block indefinitely on the epoll file descriptor, waiting on up to a max number of events
|
|
|
|
|
* TODO: Is RUNTIME_MAX_EPOLL_EVENTS actually limited to the max number of modules?
|
|
|
|
|
*/
|
|
|
|
|
/* Block indefinitely on the epoll file descriptor, waiting on up to a max number of events */
|
|
|
|
|
int descriptor_count = epoll_wait(listener_thread_epoll_file_descriptor, epoll_events,
|
|
|
|
|
RUNTIME_MAX_EPOLL_EVENTS, -1);
|
|
|
|
|
if (descriptor_count < 0) {
|
|
|
|
@ -98,138 +288,21 @@ listener_thread_main(void *dummy)
|
|
|
|
|
|
|
|
|
|
panic("epoll_wait: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
/* Assumption: Because epoll_wait is set to not timeout, we should always have descriptors here
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/* Assumption: Because epoll_wait is set to not timeout, we should always have descriptors here */
|
|
|
|
|
assert(descriptor_count > 0);
|
|
|
|
|
|
|
|
|
|
uint64_t request_arrival_timestamp = __getcycles();
|
|
|
|
|
for (int i = 0; i < descriptor_count; i++) {
|
|
|
|
|
/* Check Event to determine if epoll returned an error */
|
|
|
|
|
if ((epoll_events[i].events & EPOLLERR) == EPOLLERR) {
|
|
|
|
|
int error = 0;
|
|
|
|
|
socklen_t errlen = sizeof(error);
|
|
|
|
|
if (getsockopt(epoll_events[i].data.fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen)
|
|
|
|
|
== 0) {
|
|
|
|
|
panic("epoll_wait: %s\n", strerror(error));
|
|
|
|
|
}
|
|
|
|
|
panic("epoll_wait");
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* Assumption: We have only registered EPOLLIN events, so we should see no others here
|
|
|
|
|
*/
|
|
|
|
|
assert((epoll_events[i].events & EPOLLIN) == EPOLLIN);
|
|
|
|
|
|
|
|
|
|
/* Unpack tenant from epoll event */
|
|
|
|
|
struct tenant *tenant = (struct tenant *)epoll_events[i].data.ptr;
|
|
|
|
|
assert(tenant);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* I don't think we're responsible to cleanup epoll events, but clearing to trigger
|
|
|
|
|
* the assertion just in case.
|
|
|
|
|
*/
|
|
|
|
|
epoll_events[i].data.ptr = NULL;
|
|
|
|
|
|
|
|
|
|
/* Accept Client Request as a nonblocking socket, saving address information */
|
|
|
|
|
struct sockaddr_in client_address;
|
|
|
|
|
socklen_t address_length = sizeof(client_address);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Accept as many requests as possible, terminating when we would have blocked
|
|
|
|
|
* This inner loop is used in case there are more datagrams than epoll events for some
|
|
|
|
|
* reason
|
|
|
|
|
*/
|
|
|
|
|
while (true) {
|
|
|
|
|
int client_socket = accept4(tenant->tcp_server.socket_descriptor,
|
|
|
|
|
(struct sockaddr *)&client_address, &address_length,
|
|
|
|
|
SOCK_NONBLOCK);
|
|
|
|
|
if (unlikely(client_socket < 0)) {
|
|
|
|
|
if (errno == EWOULDBLOCK || errno == EAGAIN) break;
|
|
|
|
|
|
|
|
|
|
panic("accept4: %s", strerror(errno));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* We should never have accepted on fd 0, 1, or 2 */
|
|
|
|
|
assert(client_socket != STDIN_FILENO);
|
|
|
|
|
assert(client_socket != STDOUT_FILENO);
|
|
|
|
|
assert(client_socket != STDERR_FILENO);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* According to accept(2), it is possible that the the sockaddr structure
|
|
|
|
|
* client_address may be too small, resulting in data being truncated to fit.
|
|
|
|
|
* The accept call mutates the size value to indicate that this is the case.
|
|
|
|
|
*/
|
|
|
|
|
if (address_length > sizeof(client_address)) {
|
|
|
|
|
debuglog("Client address %s truncated because buffer was too small\n",
|
|
|
|
|
tenant->name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
http_total_increment_request();
|
|
|
|
|
|
|
|
|
|
/* Allocate HTTP Session */
|
|
|
|
|
struct http_session *session =
|
|
|
|
|
http_session_alloc(client_socket, (const struct sockaddr *)&client_address);
|
|
|
|
|
|
|
|
|
|
/* Read HTTP request */
|
|
|
|
|
/* TODO: Use epoll on block instead of busy looping */
|
|
|
|
|
int rc = 0;
|
|
|
|
|
while ((rc = http_session_receive_request(session, NULL)) == -3)
|
|
|
|
|
;
|
|
|
|
|
|
|
|
|
|
if (rc == -2) {
|
|
|
|
|
debuglog("Request size exceeded Buffer\n");
|
|
|
|
|
/* Request size exceeded Buffer, send 413 Payload Too Large */
|
|
|
|
|
http_session_send_err_oneshot(session, 413);
|
|
|
|
|
http_session_close(session);
|
|
|
|
|
continue;
|
|
|
|
|
} else if (rc == -1) {
|
|
|
|
|
http_session_send_err_oneshot(session, 400);
|
|
|
|
|
http_session_close(session);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct route *route = http_router_match_route(&tenant->router,
|
|
|
|
|
session->http_request.full_url);
|
|
|
|
|
if (route == NULL) {
|
|
|
|
|
http_session_send_err_oneshot(session, 404);
|
|
|
|
|
http_session_close(session);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Perform admissions control.
|
|
|
|
|
* If 0, workload was rejected, so close with 429 "Too Many Requests"
|
|
|
|
|
and continue
|
|
|
|
|
* TODO: Consider providing a Retry-After header
|
|
|
|
|
*/
|
|
|
|
|
uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate);
|
|
|
|
|
if (work_admitted == 0) {
|
|
|
|
|
tcp_session_send_oneshot(client_socket, http_header_build(429),
|
|
|
|
|
http_header_len(429));
|
|
|
|
|
http_session_close(session);
|
|
|
|
|
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Allocate a Sandbox */
|
|
|
|
|
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, tenant,
|
|
|
|
|
request_arrival_timestamp, work_admitted);
|
|
|
|
|
if (unlikely(sandbox == NULL)) {
|
|
|
|
|
http_session_send_err_oneshot(sandbox->http, 503);
|
|
|
|
|
http_session_close(sandbox->http);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* If the global request scheduler is full, return a 429 to the client
|
|
|
|
|
*/
|
|
|
|
|
sandbox = global_request_scheduler_add(sandbox);
|
|
|
|
|
if (unlikely(sandbox == NULL)) {
|
|
|
|
|
http_session_send_err_oneshot(sandbox->http, 429);
|
|
|
|
|
http_session_close(sandbox->http);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} /* while true */
|
|
|
|
|
} /* for loop */
|
|
|
|
|
panic_on_epoll_error(&epoll_events[i]);
|
|
|
|
|
|
|
|
|
|
if (tenant_database_find_by_ptr(epoll_events[i].data.ptr) != NULL) {
|
|
|
|
|
handle_tcp_requests(&epoll_events[i]);
|
|
|
|
|
} else {
|
|
|
|
|
resume_blocked_read(&epoll_events[i]);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
generic_thread_dump_lock_overhead();
|
|
|
|
|
} /* while true */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
panic("Listener thread unexpectedly broke loop\n");
|
|
|
|
|
}
|
|
|
|
|