feat: epoll tags

master
Sean McBride 2 years ago
parent 63e38f0e7c
commit 38494da400

@ -0,0 +1,9 @@
#pragma once
enum epoll_tag
{
EPOLL_TAG_INVALID = 0,
EPOLL_TAG_TENANT_SERVER_SOCKET = 1,
EPOLL_TAG_METRICS_SERVER_SOCKET,
EPOLL_TAG_HTTP_SESSION_CLIENT_SOCKET,
};

@ -9,17 +9,18 @@
#include <sys/socket.h>
#include <unistd.h>
#include "tcp_session.h"
#include "debuglog.h"
#include "http_request.h"
#include "epoll_tag.h"
#include "http_parser.h"
#include "http_parser_settings.h"
#include "http_request.h"
#include "http_route_total.h"
#include "http_session_perf_log.h"
#include "http_total.h"
#include "route.h"
#include "http_route_total.h"
#include "tcp_session.h"
#include "tenant.h"
#include "vec.h"
#include "http_session_perf_log.h"
#define HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE (PAGE_SIZE)
#define HTTP_SESSION_RESPONSE_HEADER_CAPACITY 256
@ -45,6 +46,7 @@ enum http_session_state
};
struct http_session {
enum epoll_tag tag;
enum http_session_state state;
struct sockaddr client_address; /* client requesting connection! */
int socket;
@ -93,6 +95,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str
assert(socket_descriptor >= 0);
assert(socket_address != NULL);
session->tag = EPOLL_TAG_HTTP_SESSION_CLIENT_SOCKET;
session->tenant = tenant;
session->route = NULL;
session->socket = socket_descriptor;

@ -1,8 +1,16 @@
#pragma once
#include "epoll_tag.h"
#include "tcp_server.h"
extern struct tcp_server metrics_server;
struct metrics_server {
enum epoll_tag tag;
struct tcp_server tcp;
pthread_attr_t thread_settings;
};
extern struct metrics_server metrics_server;
void metrics_server_init();
void metrics_server_thread_spawn(int client_socket);

@ -1,5 +1,6 @@
#pragma once
#include "epoll_tag.h"
#include "http_router.h"
#include "map.h"
#include "module_database.h"
@ -32,6 +33,7 @@ struct tenant_global_request_queue {
};
struct tenant {
enum epoll_tag tag; /* Tag must be first member */
char *name;
struct tcp_server tcp_server;
http_router_t router;

@ -87,6 +87,7 @@ tenant_alloc(struct tenant_config *config)
struct tenant *tenant = (struct tenant *)calloc(1, sizeof(struct tenant));
/* Move name */
tenant->tag = EPOLL_TAG_TENANT_SERVER_SOCKET;
tenant->name = config->name;
config->name = NULL;

@ -145,7 +145,7 @@ listener_thread_register_metrics_server()
struct epoll_event accept_evt;
accept_evt.data.ptr = (void *)&metrics_server;
accept_evt.events = EPOLLIN;
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, metrics_server.socket_descriptor,
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, metrics_server.tcp.socket_descriptor,
&accept_evt);
return rc;
@ -355,7 +355,7 @@ on_metrics_server_epoll_event(struct epoll_event *evt)
/* Accept as many clients requests as possible, returning when we would have blocked */
while (true) {
/* We accept the client connection with blocking semantics because we spawn ephemeral worker threads */
int client_socket = accept4(metrics_server.socket_descriptor, (struct sockaddr *)&client_address,
int client_socket = accept4(metrics_server.tcp.socket_descriptor, (struct sockaddr *)&client_address,
&address_length, 0);
if (unlikely(client_socket < 0)) {
if (errno == EWOULDBLOCK || errno == EAGAIN) return;
@ -433,12 +433,20 @@ listener_thread_main(void *dummy)
for (int i = 0; i < descriptor_count; i++) {
panic_on_epoll_error(&epoll_events[i]);
if (epoll_events[i].data.ptr == &metrics_server) {
on_metrics_server_epoll_event(&epoll_events[i]);
} else if (tenant_database_find_by_ptr(epoll_events[i].data.ptr) != NULL) {
enum epoll_tag *tag = (enum epoll_tag *)epoll_events[i].data.ptr;
switch (*tag) {
case EPOLL_TAG_TENANT_SERVER_SOCKET:
on_tenant_socket_epoll_event(&epoll_events[i]);
} else {
break;
case EPOLL_TAG_HTTP_SESSION_CLIENT_SOCKET:
on_client_socket_epoll_event(&epoll_events[i]);
break;
case EPOLL_TAG_METRICS_SERVER_SOCKET:
on_metrics_server_epoll_event(&epoll_events[i]);
break;
default:
panic("Unknown epoll type!");
}
}
}

@ -8,6 +8,7 @@
#include "debuglog.h"
#include "http.h"
#include "http_total.h"
#include "metrics_server.h"
#include "proc_stat.h"
#include "runtime.h"
#include "sandbox_total.h"
@ -18,8 +19,7 @@
#define METRICS_SERVER_CORE_ID 0
#define METRICS_SERVER_PORT 1776
static pthread_attr_t metrics_server_thread_settings;
struct tcp_server metrics_server;
struct metrics_server metrics_server;
static void *metrics_server_handler(void *arg);
extern void metrics_server_route_level_metrics_render(FILE *ostream);
@ -27,22 +27,23 @@ extern void metrics_server_route_level_metrics_render(FILE *ostream);
void
metrics_server_init()
{
tcp_server_init(&metrics_server, METRICS_SERVER_PORT);
int rc = tcp_server_listen(&metrics_server);
metrics_server.tag = EPOLL_TAG_METRICS_SERVER_SOCKET;
tcp_server_init(&metrics_server.tcp, METRICS_SERVER_PORT);
int rc = tcp_server_listen(&metrics_server.tcp);
assert(rc == 0);
/* Configure pthread attributes to pin metrics server threads to CPU 0 */
pthread_attr_init(&metrics_server_thread_settings);
pthread_attr_init(&metrics_server.thread_settings);
cpu_set_t cs;
CPU_ZERO(&cs);
CPU_SET(METRICS_SERVER_CORE_ID, &cs);
pthread_attr_setaffinity_np(&metrics_server_thread_settings, sizeof(cpu_set_t), &cs);
pthread_attr_setaffinity_np(&metrics_server.thread_settings, sizeof(cpu_set_t), &cs);
}
int
metrics_server_close()
{
return tcp_server_close(&metrics_server);
return tcp_server_close(&metrics_server.tcp);
}
void
@ -65,7 +66,7 @@ metrics_server_thread_spawn(int client_socket)
/* Fire and forget, so we don't save the thread handles */
pthread_t metrics_server_thread;
int rc = pthread_create(&metrics_server_thread, &metrics_server_thread_settings, metrics_server_handler,
int rc = pthread_create(&metrics_server_thread, &metrics_server.thread_settings, metrics_server_handler,
(void *)(long)client_socket);
if (rc != 0) {

Loading…
Cancel
Save