feat: partial tenant implementation

master
Sean McBride 3 years ago
parent 7a62b154fc
commit 01cca785f4

@ -60,7 +60,6 @@ BINARY_NAME=sledgert
# CFLAGS += -DLOG_CONTEXT_SWITCHES
# CFLAGS += -DLOG_HTTP_PARSER
# CFLAGS += -DLOG_LOCK_OVERHEAD
# CFLAGS += -DLOG_MODULE_LOADING
# CFLAGS += -DLOG_PREEMPTION
# CFLAGS += -DLOG_SANDBOX_ALLOCATION
# CFLAGS += -DLOG_UNSUPPORTED_WASI

@ -0,0 +1,86 @@
#pragma once
#include <stdlib.h>
#include <string.h>
#include "admissions_info.h"
#include "http.h"
#include "route_config.h"
#include "module_database.h"
#include "tcp_server.h"
#define HTTP_ROUTER_CAPACITY 32
/* Assumption: entrypoint is always _start. This should be enhanced later */
struct route {
char *route;
struct module *module;
/* HTTP State */
uint32_t relative_deadline_us;
uint64_t relative_deadline; /* cycles */
size_t max_request_size;
size_t max_response_size;
char *response_content_type;
struct admissions_info admissions_info;
};
struct http_router {
struct route routes[HTTP_ROUTER_CAPACITY];
size_t route_length;
};
static inline void
http_router_init(struct http_router *router)
{
router->route_length = 0;
}
static inline int
http_router_add_route(struct http_router *router, struct route_config *config, struct module *module)
{
assert(router != NULL);
assert(config != NULL);
assert(module != NULL);
assert(config->route != NULL);
assert(config->http_resp_content_type != NULL);
if (router->route_length < HTTP_ROUTER_CAPACITY) {
router->routes[router->route_length] = (struct route){
.route = config->route,
.module = module,
.relative_deadline_us = config->relative_deadline_us,
.relative_deadline = (uint64_t)config->relative_deadline_us * runtime_processor_speed_MHz,
.max_request_size = config->http_req_size,
.max_response_size = config->http_resp_size,
.response_content_type = config->http_resp_content_type
};
/* Move strings from config */
config->route = NULL;
config->http_resp_content_type = NULL;
/* Admissions Control */
uint64_t expected_execution = (uint64_t)config->expected_execution_us * runtime_processor_speed_MHz;
admissions_info_initialize(&router->routes[router->route_length].admissions_info,
config->admissions_percentile, expected_execution,
router->routes[router->route_length].relative_deadline);
router->route_length++;
return 0;
}
return -1;
}
static inline struct route *
http_router_match_route(struct http_router *router, char *route)
{
for (int i = 0; i < router->route_length; i++) {
if (strncmp(route, router->routes[i].route, strlen(router->routes[i].route)) == 0) {
return &router->routes[i];
}
}
return NULL;
}

@ -16,6 +16,8 @@
#include "http_parser_settings.h"
#include "vec.h"
#define HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE (PAGE_SIZE)
#define u8 uint8_t
VEC(u8)
@ -34,8 +36,7 @@ struct http_session {
* @returns 0 on success, -1 on error
*/
static inline int
http_session_init(struct http_session *session, size_t max_request_size, size_t max_response_size,
int socket_descriptor, const struct sockaddr *socket_address)
http_session_init(struct http_session *session, int socket_descriptor, const struct sockaddr *socket_address)
{
assert(session != NULL);
assert(socket_address != NULL);
@ -49,10 +50,10 @@ http_session_init(struct http_session *session, size_t max_request_size, size_t
session->http_parser.data = &session->http_request;
int rc;
rc = vec_u8_init(&session->request, max_request_size);
rc = vec_u8_init(&session->request, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE);
if (rc < 0) return -1;
rc = vec_u8_init(&session->response, max_response_size);
rc = vec_u8_init(&session->response, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE);
if (rc < 0) {
vec_u8_deinit(&session->request);
return -1;
@ -62,13 +63,12 @@ http_session_init(struct http_session *session, size_t max_request_size, size_t
}
static inline struct http_session *
http_session_alloc(size_t max_request_size, size_t max_response_size, int socket_descriptor,
const struct sockaddr *socket_address)
http_session_alloc(int socket_descriptor, const struct sockaddr *socket_address)
{
struct http_session *session = calloc(sizeof(struct http_session), 1);
if (session == NULL) return NULL;
int rc = http_session_init(session, max_request_size, max_response_size, socket_descriptor, socket_address);
int rc = http_session_init(session, socket_descriptor, socket_address);
if (rc != 0) {
free(session);
return NULL;
@ -174,16 +174,15 @@ http_session_receive(struct http_session *session, void_cb on_eagain)
http_parser *parser = &session->http_parser;
const http_parser_settings *settings = http_parser_settings_get();
size_t request_length = request->length;
size_t request_capacity = request->capacity;
if (request_length >= request_capacity) {
debuglog("Ran out of Request Buffer before message end\n");
goto err_nobufs;
if (request->length == request->capacity) {
if (vec_u8_grow(request) != 0) {
debuglog("Ran out of Request Buffer before message end\n");
goto err_nobufs;
}
}
ssize_t bytes_received = recv(session->socket, &request->buffer[request_length],
request_capacity - request_length, 0);
ssize_t bytes_received = recv(session->socket, &request->buffer[request->length],
request->capacity - request->length, 0);
if (bytes_received < 0) {
if (errno == EAGAIN) {
@ -220,7 +219,7 @@ http_session_receive(struct http_session *session, void_cb on_eagain)
&session->request.buffer[session->request.length], bytes_received);
#endif
size_t bytes_parsed = http_parser_execute(parser, settings,
(const char *)&request->buffer[request_length],
(const char *)&request->buffer[request->length],
(size_t)bytes_received);
if (bytes_parsed != (size_t)bytes_received) {

@ -1,6 +1,6 @@
#pragma once
#include <assert.h>
#include <inttypes.h>
#include <stdbool.h>
#include <stdlib.h>
@ -9,39 +9,6 @@
#include <string.h>
#include <jsmn.h>
#include "runtime.h"
#include "http.h"
#include "module.h"
#include "module_config.h"
#define JSON_TOKENS_CAPACITY 16384
enum
{
module_name,
module_path,
module_port,
module_route,
module_expected_execution_us,
module_admissions_percentile,
module_relative_deadline_us,
module_http_req_size,
module_http_resp_size,
module_http_resp_content_type,
module_keys_len
};
static const char *module_keys[module_keys_len] = { "name",
"path",
"port",
"route",
"expected-execution-us",
"admissions-percentile",
"relative-deadline-us",
"http-req-size",
"http-resp-size",
"http-resp-content-type" };
static inline char *
jsmn_type(jsmntype_t type)
{
@ -73,11 +40,14 @@ has_valid_size(jsmntok_t tok, char *key, int expected_size)
}
static inline bool
has_valid_type(jsmntok_t tok, char *key, jsmntype_t expected_type)
has_valid_type(jsmntok_t tok, char *key, jsmntype_t expected_type, const char *json_buf)
{
if (tok.type != expected_type) {
fprintf(stderr, "The value of the key %s should be a %s, was a %s\n", key, jsmn_type(expected_type),
jsmn_type(tok.type));
if (json_buf != NULL)
fprintf(stderr, "String of value %.*s\n", tok.end - tok.start, &json_buf[tok.start]);
return false;
}
@ -87,7 +57,7 @@ has_valid_type(jsmntok_t tok, char *key, jsmntype_t expected_type)
static inline bool
is_nonempty_string(jsmntok_t tok, char *key)
{
if (!has_valid_type(tok, key, JSMN_STRING)) return false;
if (!has_valid_type(tok, key, JSMN_STRING, NULL)) return false;
if (tok.end - tok.start < 1) {
fprintf(stderr, "The value of the key %s was an empty string\n", key);
@ -97,6 +67,32 @@ is_nonempty_string(jsmntok_t tok, char *key)
return true;
}
static inline bool
is_nonempty_object(jsmntok_t tok, char *key)
{
if (!has_valid_type(tok, key, JSMN_OBJECT, NULL)) return false;
if (tok.size == 0) {
fprintf(stderr, "The value of the key %s was an empty object\n", key);
return false;
}
return true;
}
static inline bool
is_nonempty_array(jsmntok_t tok, char *key)
{
if (!has_valid_type(tok, key, JSMN_ARRAY, NULL)) return false;
if (tok.size == 0) {
fprintf(stderr, "The value of the key %s was an empty array\n", key);
return false;
}
return true;
}
static inline bool
is_valid_key(jsmntok_t tok)
{
@ -158,156 +154,17 @@ parse_uint32_t(jsmntok_t tok, const char *json_buf, const char *key, uint32_t *r
return 0;
}
/**
* Parses a JSON file into an array of module configs
* @param file_name The path of the JSON file
* @return module_config_vec_len on success. -1 on Error
*/
static inline int
parse_json(const char *json_buf, ssize_t json_buf_size, struct module_config **module_config_vec)
parse_uint64_t(jsmntok_t tok, const char *json_buf, const char *key, uint64_t *ret)
{
assert(json_buf != NULL);
assert(json_buf_size > 0);
assert(module_config_vec != NULL);
jsmntok_t tokens[JSON_TOKENS_CAPACITY];
int module_config_vec_len = 0;
/* Initialize the Jasmine Parser and an array to hold the tokens */
jsmn_parser module_parser;
jsmn_init(&module_parser);
/* Use Jasmine to parse the JSON */
int total_tokens = jsmn_parse(&module_parser, json_buf, json_buf_size, tokens, JSON_TOKENS_CAPACITY);
if (total_tokens < 0) {
if (total_tokens == JSMN_ERROR_INVAL) {
fprintf(stderr, "Error parsing %s: bad token, JSON string is corrupted\n", json_buf);
} else if (total_tokens == JSMN_ERROR_PART) {
fprintf(stderr, "Error parsing %s: JSON string is too short, expecting more JSON data\n",
json_buf);
} else if (total_tokens == JSMN_ERROR_NOMEM) {
/*
* According to the README at https://github.com/zserge/jsmn, this is a potentially recoverable
* error. More tokens can be allocated and jsmn_parse can be re-invoked.
*/
fprintf(stderr, "Error parsing %s: Not enough tokens, JSON string is too large\n", json_buf);
}
goto err;
}
if (tokens[0].type != JSMN_ARRAY) {
fprintf(stderr, "Outermost Config should be a JSON array, was a JSON %s\n", jsmn_type(tokens[0].type));
goto err;
}
module_config_vec_len = tokens[0].size;
if (module_config_vec_len == 0) {
fprintf(stderr, "Config is an empty JSON array\n");
goto err;
}
char *end = NULL;
uintmax_t temp = strtoimax(&json_buf[tok.start], &end, 10);
*module_config_vec = (struct module_config *)calloc(module_config_vec_len, sizeof(struct module_config));
int module_idx = -1;
int module_fields_remaining = 0;
for (int i = 1; i < total_tokens; i++) {
char key[32] = { 0 };
/* Assumption: Objects are never used within a module_config. This likely will not be true in the
* future due to routes or multiple entrypoints */
if (tokens[i].type == JSMN_OBJECT) {
assert(module_fields_remaining == 0);
module_fields_remaining = tokens[i].size;
module_idx++;
} else {
/* Inside Object */
/* Validate that key is non-emptry string */
if (!is_valid_key(tokens[i])) goto json_parse_err;
sprintf(key, "%.*s", tokens[i].end - tokens[i].start, json_buf + tokens[i].start);
/* Validate that key has a value */
if (!has_valid_size(tokens[i], key, 1)) goto json_parse_err;
/* Advance to Value */
i++;
if (strcmp(key, module_keys[module_name]) == 0) {
if (!is_nonempty_string(tokens[i], key)) goto json_parse_err;
(*module_config_vec)[module_idx].name = strndup(json_buf + tokens[i].start,
tokens[i].end - tokens[i].start);
} else if (strcmp(key, module_keys[module_path]) == 0) {
if (!is_nonempty_string(tokens[i], key)) goto json_parse_err;
(*module_config_vec)[module_idx].path = strndup(json_buf + tokens[i].start,
tokens[i].end - tokens[i].start);
} else if (strcmp(key, module_keys[module_port]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE)) goto json_parse_err;
int rc = parse_uint16_t(tokens[i], json_buf, module_keys[module_port],
&(*module_config_vec)[module_idx].port);
if (rc < 0) goto json_parse_err;
} else if (strcmp(key, module_keys[module_route]) == 0) {
if (!is_nonempty_string(tokens[i], key)) goto json_parse_err;
(*module_config_vec)[module_idx].route = strndup(json_buf + tokens[i].start,
tokens[i].end - tokens[i].start);
} else if (strcmp(key, module_keys[module_expected_execution_us]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE)) goto json_parse_err;
int rc = parse_uint32_t(tokens[i], json_buf, module_keys[module_expected_execution_us],
&(*module_config_vec)[module_idx].expected_execution_us);
if (rc < 0) goto json_parse_err;
} else if (strcmp(key, module_keys[module_admissions_percentile]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE)) goto json_parse_err;
int rc = parse_uint8_t(tokens[i], json_buf, module_keys[module_admissions_percentile],
&(*module_config_vec)[module_idx].admissions_percentile);
if (rc < 0) goto json_parse_err;
} else if (strcmp(key, module_keys[module_relative_deadline_us]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE)) goto json_parse_err;
int rc = parse_uint32_t(tokens[i], json_buf, module_keys[module_relative_deadline_us],
&(*module_config_vec)[module_idx].relative_deadline_us);
if (rc < 0) goto json_parse_err;
} else if (strcmp(key, module_keys[module_http_req_size]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE)) goto json_parse_err;
int rc = parse_uint32_t(tokens[i], json_buf, module_keys[module_http_req_size],
&(*module_config_vec)[module_idx].http_req_size);
if (rc < 0) goto json_parse_err;
} else if (strcmp(key, module_keys[module_http_resp_size]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE)) goto json_parse_err;
int rc = parse_uint32_t(tokens[i], json_buf, module_keys[module_http_resp_size],
&(*module_config_vec)[module_idx].http_resp_size);
if (rc < 0) goto json_parse_err;
} else if (strcmp(key, module_keys[module_http_resp_content_type]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_STRING)) goto json_parse_err;
(*module_config_vec)[module_idx].http_resp_content_type =
strndup(json_buf + tokens[i].start, tokens[i].end - tokens[i].start);
} else {
fprintf(stderr, "%s is not a valid key\n", key);
goto json_parse_err;
}
module_fields_remaining--;
}
if (end != &json_buf[tok.end] || temp < 0 || temp > UINT64_MAX) {
fprintf(stderr, "Unable to parse uint32_t for key %s\n", key);
return -1;
}
#ifdef LOG_MODULE_LOADING
debuglog("Loaded %d module%s!\n", module_count, module_count > 1 ? "s" : "");
#endif
done:
return module_config_vec_len;
json_parse_err:
free(*module_config_vec);
err:
fprintf(stderr, "JSON:\n%s\n", json_buf);
module_config_vec_len = -1;
goto done;
*ret = (uint64_t)temp;
return 0;
}

@ -0,0 +1,62 @@
#pragma once
#include <assert.h>
#include <jsmn.h>
#include <stdio.h>
#include <stdlib.h>
#include "tenant_config_parse.h"
#define JSON_TOKENS_CAPACITY 16384
/**
* Parses a JSON file into an array of tenant configs
* @param file_name The path of the JSON file
* @return tenant_config_vec_len on success. -1 on Error
*/
static inline int
parse_json(const char *json_buf, ssize_t json_buf_size, struct tenant_config **tenant_config_vec)
{
assert(json_buf != NULL);
assert(json_buf_size > 0);
assert(tenant_config_vec != NULL);
jsmntok_t tokens[JSON_TOKENS_CAPACITY];
int tenant_config_vec_len = 0;
int i = 0;
/* Initialize the Jasmine Parser and an array to hold the tokens */
jsmn_parser module_parser;
jsmn_init(&module_parser);
/* Use Jasmine to parse the JSON */
int total_tokens = jsmn_parse(&module_parser, json_buf, json_buf_size, tokens, JSON_TOKENS_CAPACITY);
if (total_tokens < 0) {
if (total_tokens == JSMN_ERROR_INVAL) {
fprintf(stderr, "Error parsing %s: bad token, JSON string is corrupted\n", json_buf);
} else if (total_tokens == JSMN_ERROR_PART) {
fprintf(stderr, "Error parsing %s: JSON string is too short, expecting more JSON data\n",
json_buf);
} else if (total_tokens == JSMN_ERROR_NOMEM) {
/*
* According to the README at https://github.com/zserge/jsmn, this is a potentially recoverable
* error. More tokens can be allocated and jsmn_parse can be re-invoked.
*/
fprintf(stderr, "Error parsing %s: Not enough tokens, JSON string is too large\n", json_buf);
}
goto err;
}
i = tenant_config_vec_parse(tenant_config_vec, &tenant_config_vec_len, json_buf, tokens, i, total_tokens);
assert(i == total_tokens - 1);
done:
return tenant_config_vec_len;
json_parse_err:
free(*tenant_config_vec);
err:
fprintf(stderr, "JSON:\n%s\n", json_buf);
tenant_config_vec_len = -1;
goto done;
}

@ -12,7 +12,6 @@ extern pthread_t listener_thread_id;
void listener_thread_initialize(void);
noreturn void *listener_thread_main(void *dummy);
int listener_thread_register_module(struct module *mod);
/**
* Used to determine if running in the context of a listener thread

@ -8,8 +8,6 @@
#include "admissions_control.h"
#include "admissions_info.h"
#include "current_wasm_module_instance.h"
#include "http.h"
#include "module_config.h"
#include "panic.h"
#include "pool.h"
#include "sledge_abi_symbols.h"
@ -20,12 +18,11 @@
#include "wasm_memory.h"
#include "wasm_table.h"
#define MODULE_DEFAULT_REQUEST_RESPONSE_SIZE (PAGE_SIZE)
#define MODULE_MAX_NAME_LENGTH 32
#define MODULE_MAX_PATH_LENGTH 256
#define MODULE_MAX_ROUTE_LENGTH 256
#define MODULE_DATABASE_CAPACITY 128
extern thread_local int worker_thread_idx;
INIT_POOL(wasm_memory, wasm_memory_free)
@ -40,23 +37,8 @@ struct module_pools {
} __attribute__((aligned(CACHE_PAD)));
struct module {
/* Metadata from JSON Config */
char name[MODULE_MAX_NAME_LENGTH];
char path[MODULE_MAX_PATH_LENGTH];
char route[MODULE_MAX_ROUTE_LENGTH];
uint32_t stack_size; /* a specification? */
uint32_t relative_deadline_us;
struct admissions_info admissions_info;
uint64_t relative_deadline; /* cycles */
/* TCP State */
struct tcp_server tcp_server;
/* HTTP State */
size_t max_request_size;
size_t max_response_size;
char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH];
char *path;
uint32_t stack_size; /* a specification? */
/* Handle and ABI Symbols for *.so file */
struct sledge_abi_symbols abi;
@ -67,6 +49,13 @@ struct module {
struct module_pools pools[MAX_WORKER_THREADS];
};
/********************************
* Public Methods from module.c *
*******************************/
void module_free(struct module *module);
struct module *module_alloc(char *path);
/*************************
* Public Static Inlines *
************************/
@ -227,10 +216,12 @@ module_free_linear_memory(struct module *module, struct wasm_memory *memory)
wasm_memory_pool_add_nolock(&module->pools[worker_thread_idx].memory, memory);
}
/********************************
* Public Methods from module.c *
*******************************/
void module_free(struct module *module);
struct module *module_alloc(struct module_config *config);
int module_listen(struct module *module);
struct module_database {
struct module *modules[MODULE_DATABASE_CAPACITY];
size_t count;
};
int module_database_add(struct module_database *db, struct module *module);
struct module *module_database_find_by_path(struct module_database *db, char *path);
void module_database_init(struct module_database *db);

@ -1,40 +0,0 @@
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
struct module_config {
char *name;
char *path;
char *route;
uint16_t port;
uint8_t admissions_percentile;
uint32_t expected_execution_us;
uint32_t relative_deadline_us;
uint32_t http_req_size;
uint32_t http_resp_size;
char *http_resp_content_type;
};
static inline void
module_config_deinit(struct module_config *config)
{
free(config->name);
free(config->path);
free(config->http_resp_content_type);
}
static inline void
print_module_config(struct module_config *config)
{
printf("Name: %s\n", config->name);
printf("Path: %s\n", config->path);
printf("Port: %u\n", config->port);
printf("admissions_percentile: %u\n", config->admissions_percentile);
printf("expected_execution_us: %u\n", config->expected_execution_us);
printf("relative_deadline_us: %u\n", config->relative_deadline_us);
printf("http_req_size: %u\n", config->http_req_size);
printf("http_resp_size: %u\n", config->http_resp_size);
printf("http_resp_content_type: %s\n", config->http_resp_content_type);
}

@ -2,9 +2,3 @@
#include "module.h"
#define MODULE_DATABASE_CAPACITY 128
int module_database_add(struct module *module);
struct module *module_database_find_by_name(char *name);
struct module *module_database_find_by_socket_descriptor(int socket_descriptor);
struct module *module_database_find_by_port(uint16_t port);

@ -0,0 +1,37 @@
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
struct route_config {
char *path;
char *route;
uint8_t admissions_percentile;
uint32_t expected_execution_us;
uint32_t relative_deadline_us;
uint32_t http_req_size;
uint32_t http_resp_size;
char *http_resp_content_type;
};
static inline void
route_config_deinit(struct route_config *config)
{
free(config->path);
free(config->route);
free(config->http_resp_content_type);
}
static inline void
route_config_print(struct route_config *config)
{
printf("[Route] Route: %s\n", config->route);
printf("[Route] Path: %s\n", config->path);
printf("[Route] Admissions Percentile: %hhu\n", config->admissions_percentile);
printf("[Route] Expected Execution (us): %u\n", config->expected_execution_us);
printf("[Route] Relative Deadline (us): %u\n", config->relative_deadline_us);
printf("[Route] HTTP Request Size: %u\n", config->http_req_size);
printf("[Route] HTTP Response Size: %u\n", config->http_resp_size);
printf("[Route] HTTP Response Content Type: %s\n", config->http_resp_content_type);
}

@ -0,0 +1,106 @@
#pragma once
#include <stdio.h>
#include <string.h>
#include "json.h"
#include "route_config.h"
enum
{
route_config_json_key_http_path,
route_config_json_key_module_path,
route_config_json_key_admissions_percentile,
route_config_json_key_expected_execution_us,
route_config_json_key_relative_deadline_us,
route_config_json_key_http_req_size,
route_config_json_key_http_resp_size,
route_config_json_key_http_resp_content_type,
route_config_json_key_len
};
static const char *route_config_json_keys[route_config_json_key_len] = {
"route", "path", "admissions-percentile", "expected-execution-us", "relative-deadline-us",
"http-req-size", "http-resp-size", "http-resp-content-type"
};
static inline int
route_config_parse(struct route_config *config, const char *json_buf, jsmntok_t *tokens, size_t tokens_base,
int tokens_size)
{
int i = tokens_base;
char key[32] = { 0 };
if (!has_valid_type(tokens[i], "anonymous object in array", JSMN_OBJECT, json_buf)) return -1;
int route_keys_len = tokens[i].size;
if (tokens[i].size == 0) {
fprintf(stderr, "empty route object\n");
return -1;
}
for (int route_key_idx = 0; route_key_idx < route_keys_len; route_key_idx++) {
i++;
if (!is_valid_key(tokens[i])) return -1;
sprintf(key, "%.*s", tokens[i].end - tokens[i].start, json_buf + tokens[i].start);
/* Advance to Value */
i++;
if (strcmp(key, route_config_json_keys[route_config_json_key_http_path]) == 0) {
if (!is_nonempty_string(tokens[i], key)) return -1;
config->route = strndup(json_buf + tokens[i].start, tokens[i].end - tokens[i].start);
} else if (strcmp(key, route_config_json_keys[route_config_json_key_module_path]) == 0) {
if (!is_nonempty_string(tokens[i], key)) return -1;
config->path = strndup(json_buf + tokens[i].start, tokens[i].end - tokens[i].start);
} else if (strcmp(key, route_config_json_keys[route_config_json_key_admissions_percentile]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE, json_buf)) return -1;
int rc = parse_uint8_t(tokens[i], json_buf,
route_config_json_keys[route_config_json_key_admissions_percentile],
&config->admissions_percentile);
if (rc < 0) return -1;
} else if (strcmp(key, route_config_json_keys[route_config_json_key_expected_execution_us]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE, json_buf)) return -1;
int rc = parse_uint32_t(tokens[i], json_buf,
route_config_json_keys[route_config_json_key_expected_execution_us],
&config->expected_execution_us);
if (rc < 0) return -1;
} else if (strcmp(key, route_config_json_keys[route_config_json_key_relative_deadline_us]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE, json_buf)) return -1;
int rc = parse_uint32_t(tokens[i], json_buf,
route_config_json_keys[route_config_json_key_relative_deadline_us],
&config->relative_deadline_us);
if (rc < 0) return -1;
} else if (strcmp(key, route_config_json_keys[route_config_json_key_http_req_size]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE, json_buf)) return -1;
int rc = parse_uint32_t(tokens[i], json_buf,
route_config_json_keys[route_config_json_key_http_req_size],
&config->http_req_size);
if (rc < 0) return -1;
} else if (strcmp(key, route_config_json_keys[route_config_json_key_http_resp_size]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE, json_buf)) return -1;
int rc = parse_uint32_t(tokens[i], json_buf,
route_config_json_keys[route_config_json_key_http_resp_size],
&config->http_resp_size);
if (rc < 0) return -1;
} else if (strcmp(key, route_config_json_keys[route_config_json_key_http_resp_content_type]) == 0) {
if (!is_nonempty_string(tokens[i], key)) return -1;
config->http_resp_content_type = strndup(json_buf + tokens[i].start,
tokens[i].end - tokens[i].start);
} else {
fprintf(stderr, "%s is not a valid key\n", key);
return -1;
}
}
return i;
}

@ -5,14 +5,15 @@
#include <stdint.h>
#include "panic.h"
#include "tenant.h"
#include "sandbox_types.h"
/***************************
* Public API *
**************************/
struct sandbox *sandbox_alloc(struct module *module, struct http_session *session, uint64_t request_arrival_timestamp,
uint64_t admissions_estimate);
struct sandbox *sandbox_alloc(struct module *module, struct http_session *session, struct route *route,
struct tenant *tenant, uint64_t request_arrival_timestamp, uint64_t admissions_estimate);
int sandbox_prepare_execution_environment(struct sandbox *sandbox);
void sandbox_free(struct sandbox *sandbox);
void sandbox_main(struct sandbox *sandbox);

@ -36,16 +36,16 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox)
* becomes more intelligent, then peak linear memory size needs to be tracked
* seperately from current linear memory size.
*/
fprintf(sandbox_perf_log, "%lu,%s,%d,%s,%lu,%lu,%lu,,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n",
sandbox->id, sandbox->module->name, sandbox->module->tcp_server.port,
sandbox_state_stringify(sandbox->state), sandbox->module->relative_deadline, sandbox->total_time,
queued_duration, sandbox->duration_of_state[SANDBOX_UNINITIALIZED],
sandbox->duration_of_state[SANDBOX_ALLOCATED], sandbox->duration_of_state[SANDBOX_INITIALIZED],
sandbox->duration_of_state[SANDBOX_RUNNABLE], sandbox->duration_of_state[SANDBOX_INTERRUPTED],
sandbox->duration_of_state[SANDBOX_PREEMPTED], sandbox->duration_of_state[SANDBOX_RUNNING_SYS],
sandbox->duration_of_state[SANDBOX_RUNNING_USER], sandbox->duration_of_state[SANDBOX_ASLEEP],
sandbox->duration_of_state[SANDBOX_RETURNED], sandbox->duration_of_state[SANDBOX_COMPLETE],
sandbox->duration_of_state[SANDBOX_ERROR], runtime_processor_speed_MHz);
fprintf(sandbox_perf_log, "%lu,%s,%s,%s,%lu,%lu,%lu,,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n",
sandbox->id, sandbox->tenant->name, sandbox->module->path, sandbox_state_stringify(sandbox->state),
sandbox->route->relative_deadline, sandbox->total_time, queued_duration,
sandbox->duration_of_state[SANDBOX_UNINITIALIZED], sandbox->duration_of_state[SANDBOX_ALLOCATED],
sandbox->duration_of_state[SANDBOX_INITIALIZED], sandbox->duration_of_state[SANDBOX_RUNNABLE],
sandbox->duration_of_state[SANDBOX_INTERRUPTED], sandbox->duration_of_state[SANDBOX_PREEMPTED],
sandbox->duration_of_state[SANDBOX_RUNNING_SYS], sandbox->duration_of_state[SANDBOX_RUNNING_USER],
sandbox->duration_of_state[SANDBOX_ASLEEP], sandbox->duration_of_state[SANDBOX_RETURNED],
sandbox->duration_of_state[SANDBOX_COMPLETE], sandbox->duration_of_state[SANDBOX_ERROR],
runtime_processor_speed_MHz);
}
static inline void

@ -47,8 +47,8 @@ sandbox_set_as_complete(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox_state_totals_decrement(last_state);
/* Admissions Control Post Processing */
admissions_info_update(&sandbox->module->admissions_info, sandbox->duration_of_state[SANDBOX_RUNNING_USER]
+ sandbox->duration_of_state[SANDBOX_RUNNING_SYS]);
admissions_info_update(&sandbox->route->admissions_info, sandbox->duration_of_state[SANDBOX_RUNNING_USER]
+ sandbox->duration_of_state[SANDBOX_RUNNING_SYS]);
admissions_control_subtract(sandbox->admissions_estimate);
/* Terminal State Logging */

@ -1,9 +1,6 @@
#pragma once
#include <stdbool.h>
#include <stdint.h>
#include <ucontext.h>
#include <unistd.h>
#include "arch/context.h"
#include "http_session.h"
@ -11,6 +8,7 @@
#include "ps_list.h"
#include "sandbox_state.h"
#include "sandbox_state_history.h"
#include "tenant.h"
#include "wasm_memory.h"
#include "wasm_types.h"
#include "wasm_stack.h"
@ -40,6 +38,10 @@ struct sandbox {
struct ps_list list; /* used by ps_list's default name-based MACROS for the scheduling runqueue */
/* Accounting Info */
struct route *route;
struct tenant *tenant;
/* HTTP State */
struct http_session *http;

@ -20,6 +20,7 @@
#include "sandbox_set_as_interrupted.h"
#include "sandbox_set_as_running_user.h"
#include "scheduler_execute_epoll_loop.h"
#include "scheduler_options.h"
/**
@ -60,13 +61,6 @@
* initialize a sandbox.
*/
enum SCHEDULER
{
SCHEDULER_FIFO = 0,
SCHEDULER_EDF = 1
};
extern enum SCHEDULER scheduler;
static inline struct sandbox *
scheduler_edf_get_next()

@ -0,0 +1,9 @@
#pragma once
enum SCHEDULER
{
SCHEDULER_FIFO = 0,
SCHEDULER_EDF = 1
};
extern enum SCHEDULER scheduler;

@ -0,0 +1,14 @@
#pragma once
#include "http_router.h"
#include "module.h"
#include "tcp_server.h"
#define TENANT_DATABASE_CAPACITY 128
struct tenant {
char *name;
struct tcp_server tcp_server;
struct http_router router;
struct module_database module_db;
};

@ -0,0 +1,34 @@
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "route_config.h"
struct tenant_config {
char *name;
uint16_t port;
struct route_config *routes;
size_t routes_len;
};
static inline void
tenant_config_deinit(struct tenant_config *config)
{
if (config->name != NULL) free(config->name);
config->name = NULL;
for (int i = 0; i < config->routes_len; i++) { route_config_deinit(&config->routes[i]); }
free(config->routes);
config->routes = NULL;
config->routes_len = 0;
}
static inline void
tenant_config_print(struct tenant_config *config)
{
printf("[Tenant] Name: %s\n", config->name);
printf("[Tenant] Path: %d\n", config->port);
printf("[Tenant] Routes Size: %zu\n", config->routes_len);
for (int i = 0; i < config->routes_len; i++) { route_config_print(&config->routes[i]); }
}

@ -0,0 +1,117 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "json.h"
#include "route_config_parse.h"
#include "tenant_config.h"
enum
{
tenant_config_json_key_name,
tenant_config_json_key_port,
tenant_config_json_key_routes,
tenant_config_json_key_len
};
static const char *tenant_config_json_keys[tenant_config_json_key_len] = { "name", "port", "routes" };
/* Tenant Config */
static inline int
tenant_config_parse(struct tenant_config *config, const char *json_buf, jsmntok_t *tokens, size_t tokens_base,
int tokens_size)
{
int i = tokens_base;
char key[32] = { 0 };
if (!has_valid_type(tokens[i], "Anonymous Tenant Config Object", JSMN_OBJECT, json_buf)) return -1;
if (!is_nonempty_object(tokens[i], "Anonymous Tenant Config Object")) return -1;
int tenant_key_count = tokens[i].size;
for (int tenant_key_idx = 0; tenant_key_idx < tenant_key_count; tenant_key_idx++) {
/* Advance to key */
i++;
if (!is_valid_key(tokens[i])) return -1;
if (!has_valid_size(tokens[i], key, 1)) return -1;
/* Copy Key */
sprintf(key, "%.*s", tokens[i].end - tokens[i].start, json_buf + tokens[i].start);
/* Advance to Value */
i++;
if (strcmp(key, tenant_config_json_keys[tenant_config_json_key_name]) == 0) {
if (!is_nonempty_string(tokens[i], key)) return -1;
config->name = strndup(json_buf + tokens[i].start, tokens[i].end - tokens[i].start);
} else if (strcmp(key, tenant_config_json_keys[tenant_config_json_key_port]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_PRIMITIVE, json_buf)) return -1;
int rc = parse_uint16_t(tokens[i], json_buf,
tenant_config_json_keys[tenant_config_json_key_port], &config->port);
if (rc < 0) return -1;
} else if (strcmp(key, tenant_config_json_keys[tenant_config_json_key_routes]) == 0) {
if (!has_valid_type(tokens[i], key, JSMN_ARRAY, json_buf)) return -1;
int routes_len = tokens[i].size;
config->routes_len = routes_len;
config->routes = (struct route_config *)calloc(routes_len, sizeof(struct route_config));
for (int route_idx = 0; route_idx < routes_len; route_idx++) {
/* Advance to object */
i++;
i = route_config_parse(&(config->routes)[route_idx], json_buf, tokens, i, tokens_size);
if (i == -1) return -1;
}
} else {
fprintf(stderr, "%s is not a valid key\n", key);
return -1;
}
}
return i;
}
/* Tenant Config Vec */
static inline int
tenant_config_vec_parse(struct tenant_config **tenant_config_vec, int *tenant_config_vec_len, const char *json_buf,
jsmntok_t *tokens, size_t tokens_base, int tokens_size)
{
int i = tokens_base;
if (tokens[i].type != JSMN_ARRAY) {
fprintf(stderr, "Outermost Config should be a JSON array, was a JSON %s\n", jsmn_type(tokens[0].type));
return -1;
}
*tenant_config_vec_len = tokens[i].size;
if (tenant_config_vec_len == 0) {
fprintf(stderr, "Config is an empty JSON array\n");
return -1;
}
*tenant_config_vec = (struct tenant_config *)calloc((size_t)(*tenant_config_vec_len),
sizeof(struct tenant_config));
if (*tenant_config_vec == NULL) {
perror("Failed to allocate vec");
return -1;
}
for (int tenant_idx = 0; tenant_idx < *tenant_config_vec_len; tenant_idx++) {
i++;
i = tenant_config_parse(&((*tenant_config_vec)[tenant_idx]), json_buf, tokens, i, tokens_size);
if (i == -1) return -1;
}
return i;
}

@ -0,0 +1,102 @@
#pragma once
#include <stdint.h>
#include <string.h>
#include "admissions_info.h"
#include "http.h"
#include "listener_thread.h"
#include "panic.h"
#include "scheduler_options.h"
#include "tenant.h"
#include "tenant_config.h"
int tenant_database_add(struct tenant *tenant);
struct tenant *tenant_database_find_by_name(char *name);
struct tenant *tenant_database_find_by_socket_descriptor(int socket_descriptor);
struct tenant *tenant_database_find_by_port(uint16_t port);
static inline struct tenant *
tenant_alloc(struct tenant_config *config)
{
/* Validate config */
if (strlen(config->name) == 0) panic("name field is required\n");
if (config->port == 0) panic("port field is required\n");
if (config->routes_len == 0) panic("one or more routesa are required\n");
struct tenant *existing_tenant = tenant_database_find_by_name(config->name);
if (existing_tenant != NULL) panic("Tenant %s is already initialized\n", existing_tenant->name);
existing_tenant = tenant_database_find_by_port(config->port);
if (existing_tenant != NULL)
panic("Tenant %s is already configured with port %u\n", existing_tenant->name, config->port);
for (int i = 0; i < config->routes_len; i++) {
struct route_config *route_config = &config->routes[i];
if (route_config->path == 0) panic("path field is required\n");
if (route_config->route == 0) panic("route field is required\n");
if (route_config->http_req_size > RUNTIME_HTTP_REQUEST_SIZE_MAX)
panic("request_size must be between 0 and %u, was %u\n",
(uint32_t)RUNTIME_HTTP_REQUEST_SIZE_MAX, route_config->http_req_size);
if (route_config->http_resp_size > RUNTIME_HTTP_RESPONSE_SIZE_MAX)
panic("response-size must be between 0 and %u, was %u\n",
(uint32_t)RUNTIME_HTTP_RESPONSE_SIZE_MAX, route_config->http_resp_size);
if (route_config->relative_deadline_us > (uint32_t)RUNTIME_RELATIVE_DEADLINE_US_MAX)
panic("Relative-deadline-us must be between 0 and %u, was %u\n",
(uint32_t)RUNTIME_RELATIVE_DEADLINE_US_MAX, route_config->relative_deadline_us);
#ifdef ADMISSIONS_CONTROL
/* expected-execution-us and relative-deadline-us are required in case of admissions control */
if (route_config->expected_execution_us == 0) panic("expected-execution-us is required\n");
if (route_config->relative_deadline_us == 0) panic("relative_deadline_us is required\n");
if (route_config->admissions_percentile > 99 || route_config->admissions_percentile < 50)
panic("admissions-percentile must be > 50 and <= 99 but was %u\n",
route_config->admissions_percentile);
/* If the ratio is too big, admissions control is too coarse */
uint32_t ratio = route_config->relative_deadline_us / route_config->expected_execution_us;
if (ratio > ADMISSIONS_CONTROL_GRANULARITY)
panic("Ratio of Deadline to Execution time cannot exceed admissions control "
"granularity of "
"%d\n",
ADMISSIONS_CONTROL_GRANULARITY);
#else
/* relative-deadline-us is required if scheduler is EDF */
if (scheduler == SCHEDULER_EDF && route_config->relative_deadline_us == 0)
panic("relative_deadline_us is required\n");
#endif
}
struct tenant *tenant = (struct tenant *)calloc(1, sizeof(struct tenant));
/* Move name */
tenant->name = config->name;
config->name = NULL;
tcp_server_init(&tenant->tcp_server, config->port);
http_router_init(&tenant->router);
module_database_init(&tenant->module_db);
for (int i = 0; i < config->routes_len; i++) {
/* Resolve module */
struct module *module = module_database_find_by_path(&tenant->module_db, config->routes[i].path);
assert(module != NULL);
http_router_add_route(&tenant->router, &config->routes[i], module);
}
return tenant;
}
/**
* Start the tenant as a server listening at tenant->port
* @param tenant
* @returns 0 on success, -1 on error
*/
int tenant_listen(struct tenant *tenant);
int listener_thread_register_tenant(struct tenant *tenant);

@ -85,9 +85,27 @@
free(vec); \
} \
\
static inline int vec_##TYPE##_resize(struct vec_##TYPE *vec, size_t capacity) \
{ \
if (vec->capacity != capacity) { \
TYPE *temp = (TYPE *)realloc(vec->buffer, sizeof(TYPE) * capacity); \
if (temp == NULL) return -1; \
vec->buffer = temp; \
vec->capacity = capacity; \
} \
return 0; \
} \
\
\
static inline int vec_##TYPE##_grow(struct vec_##TYPE *vec) \
{ \
size_t capacity = vec->capacity == 0 ? 1 : vec->capacity * 2; \
return vec_##TYPE##_resize(vec, capacity); \
} \
\
static inline int vec_##TYPE##_insert(struct vec_##TYPE *vec, uint32_t idx, TYPE value) \
{ \
if (idx >= vec->capacity) return -1; \
if (idx >= vec->capacity) vec_##TYPE##_grow(vec); \
\
vec->buffer[idx] = value; \
return 0; \

@ -5,7 +5,6 @@
#include <sys/mman.h>
#include <string.h>
#include "sandbox_types.h"
#include "types.h"
/**

@ -146,7 +146,7 @@ current_sandbox_init()
/* Initialize Arguments. First arg is the module name. Subsequent args are query parameters */
char *args[HTTP_MAX_QUERY_PARAM_COUNT + 1];
args[0] = sandbox->module->name;
args[0] = sandbox->module->path;
for (int i = 0; i < sandbox->http->http_request.query_params_count; i++)
args[i + 1] = (char *)sandbox->http->http_request.query_params[i].value;
@ -185,7 +185,7 @@ current_sandbox_fini()
sandbox->total_time = sandbox->timestamp_of.completion - sandbox->timestamp_of.request_arrival;
/* Retrieve the result, construct the HTTP response, and send to client */
if (http_session_send_response(sandbox->http, sandbox->module->response_content_type, current_sandbox_sleep)
if (http_session_send_response(sandbox->http, sandbox->route->response_content_type, current_sandbox_sleep)
< 0) {
error_message = "Unable to build and send client response\n";
goto err;

@ -790,7 +790,7 @@ wasi_snapshot_preview1_backing_fd_write(wasi_context_t *context, __wasi_fd_t fd,
__wasi_size_t sum = 0;
for (size_t i = 0; i < iovs_len; i++) {
buffer_remaining = s->module->max_response_size - s->http->response.length;
buffer_remaining = s->http->response.capacity - s->http->response.length;
if (buffer_remaining == 0) {
*nwritten_retptr = s->http->response.length - old_response_len;
return __WASI_ERRNO_FBIG;

@ -45,21 +45,21 @@ listener_thread_initialize(void)
}
/**
* @brief Registers a serverless module on the listener thread's epoll descriptor
* @brief Registers a serverless tenant on the listener thread's epoll descriptor
**/
int
listener_thread_register_module(struct module *mod)
listener_thread_register_tenant(struct tenant *tenant)
{
assert(mod != NULL);
assert(tenant != NULL);
if (unlikely(listener_thread_epoll_file_descriptor == 0)) {
panic("Attempting to register a module before listener thread initialization");
panic("Attempting to register a tenant before listener thread initialization");
}
int rc = 0;
struct epoll_event accept_evt;
accept_evt.data.ptr = (void *)mod;
accept_evt.data.ptr = (void *)tenant;
accept_evt.events = EPOLLIN;
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, mod->tcp_server.socket_descriptor,
rc = epoll_ctl(listener_thread_epoll_file_descriptor, EPOLL_CTL_ADD, tenant->tcp_server.socket_descriptor,
&accept_evt);
return rc;
@ -119,9 +119,9 @@ listener_thread_main(void *dummy)
*/
assert((epoll_events[i].events & EPOLLIN) == EPOLLIN);
/* Unpack module from epoll event */
struct module *module = (struct module *)epoll_events[i].data.ptr;
assert(module);
/* Unpack tenant from epoll event */
struct tenant *tenant = (struct tenant *)epoll_events[i].data.ptr;
assert(tenant);
/*
* I don't think we're responsible to cleanup epoll events, but clearing to trigger
@ -139,7 +139,7 @@ listener_thread_main(void *dummy)
* reason
*/
while (true) {
int client_socket = accept4(module->tcp_server.socket_descriptor,
int client_socket = accept4(tenant->tcp_server.socket_descriptor,
(struct sockaddr *)&client_address, &address_length,
SOCK_NONBLOCK);
if (unlikely(client_socket < 0)) {
@ -160,15 +160,14 @@ listener_thread_main(void *dummy)
*/
if (address_length > sizeof(client_address)) {
debuglog("Client address %s truncated because buffer was too small\n",
module->name);
tenant->name);
}
http_total_increment_request();
/* Allocate HTTP Session */
struct http_session *session =
http_session_alloc(module->max_request_size, module->max_response_size, client_socket,
(const struct sockaddr *)&client_address);
http_session_alloc(client_socket, (const struct sockaddr *)&client_address);
/* Read HTTP request */
int rc = 0;
@ -186,8 +185,9 @@ listener_thread_main(void *dummy)
continue;
}
if (strncmp(session->http_request.full_url, module->route, strlen(module->route))
!= 0) {
struct route *route = http_router_match_route(&tenant->router,
session->http_request.full_url);
if (route == NULL) {
http_session_send_err_oneshot(session, 404);
http_session_close(session);
continue;
@ -197,10 +197,11 @@ listener_thread_main(void *dummy)
/*
* Perform admissions control.
* If 0, workload was rejected, so close with 429 "Too Many Requests" and continue
* If 0, workload was rejected, so close with 429 "Too Many Requests"
and continue
* TODO: Consider providing a Retry-After header
*/
uint64_t work_admitted = admissions_control_decide(module->admissions_info.estimate);
uint64_t work_admitted = admissions_control_decide(route->admissions_info.estimate);
if (work_admitted == 0) {
tcp_session_send_oneshot(client_socket, http_header_build(429),
http_header_len(429));
@ -210,14 +211,15 @@ listener_thread_main(void *dummy)
}
/* Allocate a Sandbox */
struct sandbox *sandbox = sandbox_alloc(module, session, request_arrival_timestamp,
work_admitted);
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, tenant,
request_arrival_timestamp, work_admitted);
if (unlikely(sandbox == NULL)) {
http_session_send_err_oneshot(sandbox->http, 503);
http_session_close(sandbox->http);
}
/* If the global request scheduler is full, return a 429 to the client */
/* If the global request scheduler is full, return a 429 to the client
*/
sandbox = global_request_scheduler_add(sandbox);
if (unlikely(sandbox == NULL)) {
http_session_send_err_oneshot(sandbox->http, 429);

@ -15,17 +15,16 @@
#include <sys/fcntl.h>
#endif
#include "json.h"
#include "json_parse.h"
#include "pretty_print.h"
#include "debuglog.h"
#include "listener_thread.h"
#include "module.h"
#include "module_database.h"
#include "panic.h"
#include "runtime.h"
#include "sandbox_types.h"
#include "scheduler.h"
#include "software_interrupt.h"
#include "tenant_functions.h"
#include "worker_thread.h"
/* Conditionally used by debuglog when NDEBUG is not set */
@ -49,7 +48,7 @@ uint32_t runtime_quantum_us = 5000; /* 5ms */
static void
runtime_usage(char *cmd)
{
printf("%s <modules_file>\n", cmd);
printf("%s <spec.json>\n", cmd);
}
/**
@ -288,12 +287,6 @@ log_compiletime_config()
pretty_print_key_disabled("Log Lock Overhead");
#endif
#ifdef LOG_MODULE_LOADING
pretty_print_key_enabled("Log Module Loading");
#else
pretty_print_key_disabled("Log Module Loading");
#endif
#ifdef LOG_PREEMPTION
pretty_print_key_enabled("Log Preemption");
#else
@ -372,8 +365,8 @@ load_file_into_buffer(const char *file_name, char **file_buffer)
}
/* Open the file */
FILE *module_file = fopen(file_name, "r");
if (!module_file) {
FILE *file = fopen(file_name, "r");
if (!file) {
fprintf(stderr, "Attempt to open %s failed: %s\n", file_name, strerror(errno));
goto err;
}
@ -386,21 +379,18 @@ load_file_into_buffer(const char *file_name, char **file_buffer)
}
/* Read the file into the buffer and check that the buffer size equals the file size */
size_t total_chars_read = fread(*file_buffer, sizeof(char), stat_buffer.st_size, module_file);
#ifdef LOG_MODULE_LOADING
debuglog("size read: %d content: %s\n", total_chars_read, *file_buffer);
#endif
size_t total_chars_read = fread(*file_buffer, sizeof(char), stat_buffer.st_size, file);
if (total_chars_read != stat_buffer.st_size) {
fprintf(stderr, "Attempt to read %s into buffer failed: %s\n", file_name, strerror(errno));
goto fread_err;
}
/* Close the file */
if (fclose(module_file) == EOF) {
if (fclose(file) == EOF) {
fprintf(stderr, "Attempt to close buffer containing %s failed: %s\n", file_name, strerror(errno));
goto fclose_err;
};
module_file = NULL;
file = NULL;
return total_chars_read;
@ -410,8 +400,8 @@ fread_err:
free(*file_buffer);
stat_buffer_alloc_err:
// Check to ensure we haven't already close this
if (module_file != NULL) {
if (fclose(module_file) == EOF) panic("Failed to close file\n");
if (file != NULL) {
if (fclose(file) == EOF) panic("Failed to close file\n");
}
err:
return 0;
@ -450,47 +440,43 @@ main(int argc, char **argv)
runtime_start_runtime_worker_threads();
software_interrupt_arm_timer();
#ifdef LOG_MODULE_LOADING
debuglog("Parsing modules file [%s]\n", argv[1]);
#endif
const char *json_path = argv[1];
char *json_buf = NULL;
size_t json_buf_len = load_file_into_buffer(json_path, &json_buf);
if (unlikely(json_buf_len == 0)) panic("failed to initialize module(s) defined in %s\n", json_path);
if (unlikely(json_buf_len == 0)) panic("failed to load data from %s\n", json_path);
struct module_config *module_config_vec;
struct tenant_config *tenant_config_vec;
int module_config_vec_len = parse_json(json_buf, json_buf_len, &module_config_vec);
if (module_config_vec_len < 0) { exit(-1); }
int tenant_config_vec_len = parse_json(json_buf, json_buf_len, &tenant_config_vec);
if (tenant_config_vec_len < 0) { exit(-1); }
free(json_buf);
for (int module_idx = 0; module_idx < module_config_vec_len; module_idx++) {
/* Automatically calls listen */
struct module *module = module_alloc(&module_config_vec[module_idx]);
if (unlikely(module == NULL)) panic("failed to initialize module(s) defined in %s\n", json_path);
// for (int i = 0; i < tenant_config_vec_len; i++) { tenant_config_print(&tenant_config_vec[i]); }
int rc = module_database_add(module);
for (int tenant_idx = 0; tenant_idx < tenant_config_vec_len; tenant_idx++) {
struct tenant *tenant = tenant_alloc(&tenant_config_vec[tenant_idx]);
int rc = tenant_database_add(tenant);
if (rc < 0) {
panic("Module database full!\n");
panic("Tenant database full!\n");
exit(-1);
}
/* Start listening for requests */
rc = module_listen(module);
rc = tenant_listen(tenant);
if (rc < 0) exit(-1);
}
for (int module_idx = 0; module_idx < module_config_vec_len; module_idx++) {
module_config_deinit(&module_config_vec[module_idx]);
}
free(module_config_vec);
for (int i = 0; i < runtime_worker_threads_count; i++) {
int ret = pthread_join(runtime_worker_threads[i], NULL);
if (ret) {
errno = ret;
perror("pthread_join");
exit(-1);
for (int tenant_idx = 0; tenant_idx < tenant_config_vec_len; tenant_idx++) {
tenant_config_deinit(&tenant_config_vec[tenant_idx]);
}
free(tenant_config_vec);
for (int i = 0; i < runtime_worker_threads_count; i++) {
int ret = pthread_join(runtime_worker_threads[i], NULL);
if (ret) {
errno = ret;
perror("pthread_join");
exit(-1);
}
}
}

@ -17,103 +17,32 @@
#include "tcp_server.h"
#include "wasm_table.h"
#define IN
/*************************
* Private Static Inline *
************************/
static inline int
module_init(struct module *module, struct module_config *config)
module_init(struct module *module, IN char *path)
{
assert(module != NULL);
assert(config != NULL);
assert(config->name != NULL);
assert(config->path != NULL);
assert(config->http_resp_content_type != NULL);
assert(path != NULL);
assert(strlen(path) > 0);
uint32_t stack_size = 0;
/* Validate presence of required fields */
if (strlen(config->name) == 0) panic("name field is required\n");
if (strlen(config->path) == 0) panic("path field is required\n");
if (config->port == 0) panic("port field is required\n");
if (config->http_req_size > RUNTIME_HTTP_REQUEST_SIZE_MAX)
panic("request_size must be between 0 and %u, was %u\n", (uint32_t)RUNTIME_HTTP_REQUEST_SIZE_MAX,
config->http_req_size);
if (config->http_resp_size > RUNTIME_HTTP_RESPONSE_SIZE_MAX)
panic("response-size must be between 0 and %u, was %u\n", (uint32_t)RUNTIME_HTTP_RESPONSE_SIZE_MAX,
config->http_resp_size);
/* If a route is not specified, default to root */
if (config->route == NULL) config->route = "/";
struct module *existing_module = module_database_find_by_name(config->name);
if (existing_module != NULL) panic("Module %s is already initialized\n", existing_module->name);
existing_module = module_database_find_by_port(config->port);
if (existing_module != NULL)
panic("Module %s is already configured with port %u\n", existing_module->name, config->port);
if (config->relative_deadline_us > (uint32_t)RUNTIME_RELATIVE_DEADLINE_US_MAX)
panic("Relative-deadline-us must be between 0 and %u, was %u\n",
(uint32_t)RUNTIME_RELATIVE_DEADLINE_US_MAX, config->relative_deadline_us);
#ifdef ADMISSIONS_CONTROL
/* expected-execution-us and relative-deadline-us are required in case of admissions control */
if (config->expected_execution_us == 0) panic("expected-execution-us is required\n");
if (config->relative_deadline_us == 0) panic("relative_deadline_us is required\n");
if (config->admissions_percentile > 99 || config->admissions_percentile < 50)
panic("admissions-percentile must be > 50 and <= 99 but was %u\n", config->admissions_percentile);
/* If the ratio is too big, admissions control is too coarse */
uint32_t ratio = config->relative_deadline_us / config->expected_execution_us;
if (ratio > ADMISSIONS_CONTROL_GRANULARITY)
panic("Ratio of Deadline to Execution time cannot exceed admissions control "
"granularity of "
"%d\n",
ADMISSIONS_CONTROL_GRANULARITY);
#else
/* relative-deadline-us is required if scheduler is EDF */
if (scheduler == SCHEDULER_EDF && config->relative_deadline_us == 0)
panic("relative_deadline_us is required\n");
#endif
int rc = 0;
atomic_init(&module->reference_count, 0);
rc = sledge_abi_symbols_init(&module->abi, config->path);
rc = sledge_abi_symbols_init(&module->abi, path);
if (rc != 0) goto err;
/* Set fields in the module struct */
strncpy(module->name, config->name, MODULE_MAX_NAME_LENGTH);
strncpy(module->path, config->path, MODULE_MAX_PATH_LENGTH);
strncpy(module->route, config->route, MODULE_MAX_ROUTE_LENGTH);
strncpy(module->response_content_type, config->http_resp_content_type, HTTP_MAX_HEADER_VALUE_LENGTH);
module->path = path;
module->stack_size = ((uint32_t)(round_up_to_page(stack_size == 0 ? WASM_STACK_SIZE : stack_size)));
tcp_server_init(&module->tcp_server, config->port);
/* Deadlines */
module->relative_deadline_us = config->relative_deadline_us;
/* This can overflow a uint32_t, so be sure to cast appropriately */
module->relative_deadline = (uint64_t)config->relative_deadline_us * runtime_processor_speed_MHz;
/* Admissions Control */
uint64_t expected_execution = (uint64_t)config->expected_execution_us * runtime_processor_speed_MHz;
admissions_info_initialize(&module->admissions_info, config->admissions_percentile, expected_execution,
module->relative_deadline);
/* Request Response Buffer */
if (config->http_req_size == 0) config->http_req_size = MODULE_DEFAULT_REQUEST_RESPONSE_SIZE;
if (config->http_resp_size == 0) config->http_resp_size = MODULE_DEFAULT_REQUEST_RESPONSE_SIZE;
module->max_request_size = round_up_to_page(config->http_req_size);
module->max_response_size = round_up_to_page(config->http_resp_size);
module_alloc_table(module);
module_initialize_pools(module);
done:
@ -127,31 +56,7 @@ err:
* Public Methods
***************************************/
/**
* Start the module as a server listening at module->port
* @param module
* @returns 0 on success, -1 on error
*/
int
module_listen(struct module *module)
{
int rc = tcp_server_listen(&module->tcp_server);
if (rc < 0) goto err;
/* Set the socket descriptor and register with our global epoll instance to monitor for incoming HTTP
requests */
rc = listener_thread_register_module(module);
if (unlikely(rc < 0)) goto err_add_to_epoll;
rc = 0;
done:
return rc;
err_add_to_epoll:
tcp_server_close(&module->tcp_server);
err:
rc = -1;
goto done;
}
/**
* Module Mega Teardown Function
@ -170,7 +75,7 @@ module_free(struct module *module)
/* Do not free if we still have oustanding references */
if (module->reference_count) return;
tcp_server_close(&module->tcp_server);
// tcp_server_close(&module->tcp_server);
sledge_abi_symbols_deinit(&module->abi);
free(module);
}
@ -189,7 +94,7 @@ module_free(struct module *module)
*/
struct module *
module_alloc(struct module_config *config)
module_alloc(char *path)
{
struct module *module = (struct module *)calloc(1, sizeof(struct module));
if (!module) {
@ -197,7 +102,7 @@ module_alloc(struct module_config *config)
goto err;
};
int rc = module_init(module, config);
int rc = module_init(module, path);
if (rc < 0) goto init_err;
done:

@ -7,8 +7,11 @@
* Module Database *
******************/
struct module *module_database[MODULE_DATABASE_CAPACITY] = { NULL };
size_t module_database_count = 0;
void
module_database_init(struct module_database *db)
{
db->count = 0;
}
/**
* Adds a module to the in-memory module DB
@ -16,14 +19,14 @@ size_t module_database_count = 0;
* @return 0 on success. -ENOSPC when full
*/
int
module_database_add(struct module *module)
module_database_add(struct module_database *db, struct module *module)
{
assert(module_database_count <= MODULE_DATABASE_CAPACITY);
assert(db->count <= MODULE_DATABASE_CAPACITY);
int rc;
if (module_database_count == MODULE_DATABASE_CAPACITY) goto err_no_space;
module_database[module_database_count++] = module;
if (db->count == MODULE_DATABASE_CAPACITY) goto err_no_space;
db->modules[db->count++] = module;
rc = 0;
done:
@ -36,46 +39,23 @@ err_no_space:
/**
* Given a name, find the associated module
* Given a path, find the associated module
* @param name
* @return module or NULL if no match found
*/
struct module *
module_database_find_by_name(char *name)
module_database_find_by_path(struct module_database *db, char *path)
{
for (size_t i = 0; i < module_database_count; i++) {
assert(module_database[i]);
if (strcmp(module_database[i]->name, name) == 0) return module_database[i];
for (size_t i = 0; i < db->count; i++) {
assert(db->modules[i]);
if (strcmp(db->modules[i]->path, path) == 0) return db->modules[i];
}
return NULL;
}
/**
* Given a socket_descriptor, find the associated module
* @param socket_descriptor
* @return module or NULL if no match found
*/
struct module *
module_database_find_by_socket_descriptor(int socket_descriptor)
{
for (size_t i = 0; i < module_database_count; i++) {
assert(module_database[i]);
if (module_database[i]->tcp_server.socket_descriptor == socket_descriptor) return module_database[i];
struct module *module = module_alloc(path);
if (module != NULL) {
module_database_add(db, module);
return module;
}
return NULL;
}
/**
* Given a port, find the associated module
* @param port
* @return module or NULL if no match found
*/
struct module *
module_database_find_by_port(uint16_t port)
{
for (size_t i = 0; i < module_database_count; i++) {
assert(module_database[i]);
if (module_database[i]->tcp_server.port == port) return module_database[i];
}
return NULL;
}

@ -132,8 +132,8 @@ err_http_allocation_failed:
}
void
sandbox_init(struct sandbox *sandbox, struct module *module, struct http_session *session,
uint64_t request_arrival_timestamp, uint64_t admissions_estimate)
sandbox_init(struct sandbox *sandbox, struct module *module, struct http_session *session, struct route *route,
struct tenant *tenant, uint64_t request_arrival_timestamp, uint64_t admissions_estimate)
{
/* Sets the ID to the value before the increment */
sandbox->id = sandbox_total_postfix_increment();
@ -145,10 +145,12 @@ sandbox_init(struct sandbox *sandbox, struct module *module, struct http_session
/* Allocate HTTP session structure */
assert(session);
sandbox->http = session;
sandbox->http = session;
sandbox->tenant = tenant;
sandbox->route = route;
sandbox->timestamp_of.request_arrival = request_arrival_timestamp;
sandbox->absolute_deadline = request_arrival_timestamp + module->relative_deadline;
sandbox->absolute_deadline = request_arrival_timestamp + sandbox->route->relative_deadline;
/*
* Admissions Control State
@ -171,8 +173,8 @@ sandbox_init(struct sandbox *sandbox, struct module *module, struct http_session
* @return the new sandbox request
*/
struct sandbox *
sandbox_alloc(struct module *module, struct http_session *session, uint64_t request_arrival_timestamp,
uint64_t admissions_estimate)
sandbox_alloc(struct module *module, struct http_session *session, struct route *route, struct tenant *tenant,
uint64_t request_arrival_timestamp, uint64_t admissions_estimate)
{
struct sandbox *sandbox = NULL;
size_t page_aligned_sandbox_size = round_up_to_page(sizeof(struct sandbox));
@ -181,7 +183,7 @@ sandbox_alloc(struct module *module, struct http_session *session, uint64_t requ
if (unlikely(sandbox == NULL)) return NULL;
sandbox_set_as_allocated(sandbox);
sandbox_init(sandbox, module, session, request_arrival_timestamp, admissions_estimate);
sandbox_init(sandbox, module, session, route, tenant, request_arrival_timestamp, admissions_estimate);
return sandbox;

@ -0,0 +1,28 @@
#include "tenant.h"
#include "tenant_functions.h"
/**
* Start the tenant as a server listening at tenant->port
* @param tenant
* @returns 0 on success, -1 on error
*/
int
tenant_listen(struct tenant *tenant)
{
int rc = tcp_server_listen(&tenant->tcp_server);
if (rc < 0) goto err;
/* Set the socket descriptor and register with our global epoll instance to monitor for incoming HTTP requests
*/
rc = listener_thread_register_tenant(tenant);
if (unlikely(rc < 0)) goto err_add_to_epoll;
done:
return rc;
err_add_to_epoll:
tcp_server_close(&tenant->tcp_server);
err:
rc = -1;
goto done;
}

@ -0,0 +1,81 @@
#include <errno.h>
#include "tenant.h"
#include "panic.h"
/*******************
* Tenant Database *
******************/
struct tenant *tenant_database[TENANT_DATABASE_CAPACITY] = { NULL };
size_t tenant_database_count = 0;
/**
* Adds a tenant to the in-memory tenant DB
* @param tenant tenant to add
* @return 0 on success. -ENOSPC when full
*/
int
tenant_database_add(struct tenant *tenant)
{
assert(tenant_database_count <= TENANT_DATABASE_CAPACITY);
int rc;
if (tenant_database_count == TENANT_DATABASE_CAPACITY) goto err_no_space;
tenant_database[tenant_database_count++] = tenant;
rc = 0;
done:
return rc;
err_no_space:
panic("Cannot add tenant. Database is full.\n");
rc = -ENOSPC;
goto done;
}
/**
* Given a name, find the associated tenant
* @param name
* @return tenant or NULL if no match found
*/
struct tenant *
tenant_database_find_by_name(char *name)
{
for (size_t i = 0; i < tenant_database_count; i++) {
assert(tenant_database[i]);
if (strcmp(tenant_database[i]->name, name) == 0) return tenant_database[i];
}
return NULL;
}
/**
* Given a socket_descriptor, find the associated tenant
* @param socket_descriptor
* @return tenant or NULL if no match found
*/
struct tenant *
tenant_database_find_by_socket_descriptor(int socket_descriptor)
{
for (size_t i = 0; i < tenant_database_count; i++) {
assert(tenant_database[i]);
if (tenant_database[i]->tcp_server.socket_descriptor == socket_descriptor) return tenant_database[i];
}
return NULL;
}
/**
* Given a port, find the associated tenant
* @param port
* @return tenant or NULL if no match found
*/
struct tenant *
tenant_database_find_by_port(uint16_t port)
{
for (size_t i = 0; i < tenant_database_count; i++) {
assert(tenant_database[i]);
if (tenant_database[i]->tcp_server.port == port) return tenant_database[i];
}
return NULL;
}

@ -35,16 +35,16 @@ debug: sledgert fibonacci
--eval-command="run spec.json"
client-fib10-once:
echo "10" | http :10010
http :10020/fib?10
client-fib40-once:
echo "40" | http :10040
http :10010/fib2?40
client-preempt:
(echo "40" | http :10040 &); echo "10" | http :10010
(http :10010/fib2?40 &); http :10010/fib?10
client-fib10-multi:
hey -z ${DURATION_SEC}s -cpus 4 -c 100 -t 0 -o csv -m GET -d "10\n" "http://${HOSTNAME}:10010"
hey -z ${DURATION_SEC}s -cpus 4 -c 100 -t 0 -o csv -m GET -d "10\n" "http://${HOSTNAME}:10020/fib"
client-fib40-multi:
hey -z ${DURATION_SEC}s -cpus 4 -c 100 -t 0 -o csv -m GET -d "40\n" "http://${HOSTNAME}:10040"
hey -z ${DURATION_SEC}s -cpus 4 -c 100 -t 0 -o csv -m GET -d "40\n" "http://${HOSTNAME}:10010/fib2"

@ -1,26 +1,44 @@
[
{
"name": "fibonacci",
"path": "fibonacci.wasm.so",
"name": "gwu",
"port": 10010,
"route": "/fib",
"expected-execution-us": 6000,
"admissions-percentile": 70,
"relative-deadline-us": 20000,
"http-req-size": 1024,
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
"routes": [
{
"route": "/fib",
"path": "fibonacci.wasm.so",
"admissions-percentile": 70,
"expected-execution-us": 6000,
"relative-deadline-us": 20000,
"http-req-size": 1024,
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"route": "/fib2",
"path": "fibonacci.wasm.so",
"expected-execution-us": 10000000,
"admissions-percentile": 70,
"relative-deadline-us": 20000000,
"http-req-size": 1024,
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}
]
},
{
"name": "fibonacci_40",
"path": "fibonacci.wasm.so",
"port": 10040,
"route": "/fib",
"expected-execution-us": 10000000,
"admissions-percentile": 70,
"relative-deadline-us": 20000000,
"http-req-size": 1024,
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
"name": "conix",
"port": 10020,
"routes": [
{
"route": "/fib",
"path": "fibonacci.wasm.so",
"admissions-percentile": 70,
"expected-execution-us": 6000,
"relative-deadline-us": 20000,
"http-req-size": 1024,
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}
]
}
]

Loading…
Cancel
Save