Merge pull request #208 from gwsystems/unroll-inlines

Unroll inlines
main
Sean McBride 4 years ago committed by GitHub
commit 63ca27ab7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,125 +1,14 @@
#pragma once #pragma once
#include <stdatomic.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include <unistd.h>
#include "debuglog.h"
#include "client_socket.h"
#define ADMISSIONS_CONTROL_GRANULARITY 1000000 #define ADMISSIONS_CONTROL_GRANULARITY 1000000
/* void admissions_control_initialize();
* Unitless estimate of the instantaneous fraction of system capacity required to complete all previously void admissions_control_add(uint64_t admissions_estimate);
* admitted work. This is used to calculate free capacity as part of admissions control void admissions_control_subtract(uint64_t admissions_estimate);
* uint64_t admissions_control_calculate_estimate(uint64_t estimated_execution, uint64_t relative_deadline);
* The estimated requirements of a single admitted request is calculated as uint64_t admissions_control_calculate_estimate_us(uint32_t estimated_execution_us, uint32_t relative_deadline_us);
* estimated execution time (cycles) / relative deadline (cycles) void admissions_control_log_decision(uint64_t admissions_estimate, bool admitted);
* uint64_t admissions_control_decide(uint64_t admissions_estimate);
* These estimates are incremented on request acceptance and decremented on request completion (either
* success or failure)
*/
extern _Atomic uint64_t admissions_control_admitted;
extern uint64_t admissions_control_capacity;
extern const double admissions_control_overhead;
static inline void
admissions_control_initialize()
{
#ifdef ADMISSIONS_CONTROL
atomic_init(&admissions_control_admitted, 0);
admissions_control_capacity = runtime_worker_threads_count * ADMISSIONS_CONTROL_GRANULARITY
* ((double)1.0 - admissions_control_overhead);
#endif
}
static inline void
admissions_control_add(uint64_t admissions_estimate)
{
#ifdef ADMISSIONS_CONTROL
assert(admissions_estimate > 0);
atomic_fetch_add(&admissions_control_admitted, admissions_estimate);
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity);
#endif
#endif /* ADMISSIONS_CONTROL */
}
static inline void
admissions_control_subtract(uint64_t admissions_estimate)
{
#ifdef ADMISSIONS_CONTROL
/* Assumption: Should never underflow */
if (unlikely(admissions_estimate > admissions_control_admitted)) panic("Admissions Estimate underflow\n");
atomic_fetch_sub(&admissions_control_admitted, admissions_estimate);
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity);
#endif
#endif /* ADMISSIONS_CONTROL */
}
static inline uint64_t
admissions_control_calculate_estimate(uint64_t estimated_execution, uint64_t relative_deadline)
{
#ifdef ADMISSIONS_CONTROL
assert(relative_deadline != 0);
uint64_t admissions_estimate = (estimated_execution * (uint64_t)ADMISSIONS_CONTROL_GRANULARITY)
/ relative_deadline;
if (admissions_estimate == 0)
panic("Ratio of Deadline to Execution time cannot exceed %d\n", ADMISSIONS_CONTROL_GRANULARITY);
return admissions_estimate;
#else
return 0;
#endif
}
static inline uint64_t
admissions_control_calculate_estimate_us(uint32_t estimated_execution_us, uint32_t relative_deadline_us)
{
#ifdef ADMISSIONS_CONTROL
assert(relative_deadline_us != 0);
return (uint64_t)((uint64_t)(estimated_execution_us * ADMISSIONS_CONTROL_GRANULARITY)) / relative_deadline_us;
#else
return 0;
#endif
}
static inline void
admissions_control_log_decision(uint64_t admissions_estimate, bool admitted)
{
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Admitted: %lu, Capacity: %lu, Estimate: %lu, Admitted? %s\n", admissions_control_admitted,
admissions_control_capacity, admissions_estimate, admitted ? "yes" : "no");
#endif /* LOG_ADMISSIONS_CONTROL */
}
static inline uint64_t
admissions_control_decide(uint64_t admissions_estimate)
{
uint64_t work_admitted = 1; /* Nominal non-zero value in case admissions control is disabled */
#ifdef ADMISSIONS_CONTROL
if (unlikely(admissions_estimate == 0)) panic("Admissions estimate should never be zero");
uint64_t total_admitted = atomic_load(&admissions_control_admitted);
if (total_admitted + admissions_estimate >= admissions_control_capacity) {
admissions_control_log_decision(admissions_estimate, false);
work_admitted = 0;
} else {
admissions_control_log_decision(admissions_estimate, true);
admissions_control_add(admissions_estimate);
work_admitted = admissions_estimate;
}
#endif /* ADMISSIONS_CONTROL */
return work_admitted;
}

@ -1,6 +1,5 @@
#pragma once #pragma once
#include "debuglog.h"
#include "perf_window.h" #include "perf_window.h"
struct admissions_info { struct admissions_info {
@ -11,51 +10,6 @@ struct admissions_info {
uint64_t relative_deadline; /* Relative deadline in cycles. This is duplicated state */ uint64_t relative_deadline; /* Relative deadline in cycles. This is duplicated state */
}; };
/** void admissions_info_initialize(struct admissions_info *self, int percentile, uint64_t expected_execution,
* Initializes perf window uint64_t relative_deadline);
* @param self void admissions_info_update(struct admissions_info *self, uint64_t execution_duration);
*/
static inline void
admissions_info_initialize(struct admissions_info *self, int percentile, uint64_t expected_execution,
uint64_t relative_deadline)
{
#ifdef ADMISSIONS_CONTROL
assert(relative_deadline > 0);
assert(expected_execution > 0);
self->relative_deadline = relative_deadline;
self->estimate = admissions_control_calculate_estimate(expected_execution, relative_deadline);
debuglog("Initial Estimate: %lu\n", self->estimate);
assert(self != NULL);
perf_window_initialize(&self->perf_window);
if (unlikely(percentile < 50 || percentile > 99)) panic("Invalid admissions percentile");
self->percentile = percentile;
self->control_index = PERF_WINDOW_BUFFER_SIZE * percentile / 100;
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Percentile: %d\n", self->percentile);
debuglog("Control Index: %d\n", self->control_index);
#endif
#endif
}
/*
* Adds an execution value to the perf window and calculates and caches and updated estimate
* @param self
* @param execution_duration
*/
static inline void
admissions_info_update(struct admissions_info *self, uint64_t execution_duration)
{
#ifdef ADMISSIONS_CONTROL
assert(!software_interrupt_is_enabled());
struct perf_window *perf_window = &self->perf_window;
LOCK_LOCK(&self->perf_window.lock);
perf_window_add(perf_window, execution_duration);
uint64_t estimated_execution = perf_window_get_percentile(perf_window, self->percentile, self->control_index);
self->estimate = admissions_control_calculate_estimate(estimated_execution, self->relative_deadline);
LOCK_UNLOCK(&self->perf_window.lock);
#endif
}

@ -1,81 +1,7 @@
#pragma once #pragma once
#include <arpa/inet.h> #include <sys/socket.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "panic.h" void client_socket_close(int client_socket, struct sockaddr *client_address);
#include "debuglog.h"
#include "http_response.h"
#include "http_total.h"
#include "runtime.h"
#include "worker_thread.h"
int client_socket_send(int client_socket, int status_code);
static inline void
client_socket_close(int client_socket, struct sockaddr *client_address)
{
/* Should never close 0, 1, or 2 */
assert(client_socket != STDIN_FILENO);
assert(client_socket != STDOUT_FILENO);
assert(client_socket != STDERR_FILENO);
if (unlikely(close(client_socket) < 0)) {
char client_address_text[INET6_ADDRSTRLEN] = {};
if (unlikely(inet_ntop(AF_INET, &client_address, client_address_text, INET6_ADDRSTRLEN) == NULL)) {
debuglog("Failed to log client_address: %s", strerror(errno));
}
debuglog("Error closing client socket %d associated with %s - %s", client_socket, client_address_text,
strerror(errno));
}
}
/**
* Rejects request due to admission control or error
* @param client_socket - the client we are rejecting
* @param status_code - either 503 or 400
*/
static inline int
client_socket_send(int client_socket, int status_code)
{
const char *response;
int rc;
switch (status_code) {
case 503:
response = HTTP_RESPONSE_503_SERVICE_UNAVAILABLE;
http_total_increment_5XX();
break;
case 400:
response = HTTP_RESPONSE_400_BAD_REQUEST;
http_total_increment_4XX();
break;
default:
panic("%d is not a valid status code\n", status_code);
}
int sent = 0;
int to_send = strlen(response);
while (sent < to_send) {
rc = write(client_socket, &response[sent], to_send - sent);
if (rc < 0) {
if (errno == EAGAIN) { debuglog("Unexpectedly blocking on write of %s\n", response); }
debuglog("Error with %s\n", strerror(errno));
goto send_err;
}
sent += rc;
};
rc = 0;
done:
return rc;
send_err:
debuglog("Error sending to client: %s", strerror(errno));
rc = -1;
goto done;
}

@ -2,9 +2,5 @@
#include "sandbox.h" #include "sandbox.h"
void current_sandbox_close_file_descriptor(int io_handle_index);
struct sandbox *current_sandbox_get(void); struct sandbox *current_sandbox_get(void);
int current_sandbox_get_file_descriptor(int io_handle_index);
int current_sandbox_initialize_io_handle(void);
void current_sandbox_set(struct sandbox *sandbox); void current_sandbox_set(struct sandbox *sandbox);
int current_sandbox_set_file_descriptor(int io_handle_index, int file_descriptor);

@ -2,25 +2,8 @@
#include <stdint.h> #include <stdint.h>
#include "arch/getcycles.h"
#include "debuglog.h"
extern __thread uint64_t generic_thread_lock_duration; extern __thread uint64_t generic_thread_lock_duration;
extern __thread uint64_t generic_thread_start_timestamp; extern __thread uint64_t generic_thread_start_timestamp;
/** void generic_thread_dump_lock_overhead();
* Reports lock contention
*/
static inline void
generic_thread_dump_lock_overhead()
{
#ifndef NDEBUG
#ifdef LOG_LOCK_OVERHEAD
uint64_t duration = __getcycles() - generic_thread_start_timestamp;
debuglog("Locks consumed %lu / %lu cycles, or %f%%\n", generic_thread_lock_duration, duration,
(double)generic_thread_lock_duration / duration * 100);
#endif
#endif
}
void generic_thread_initialize(); void generic_thread_initialize();

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <stdbool.h> #include <stdbool.h>
#include <stdio.h>
#include "http.h" #include "http.h"
@ -28,22 +27,5 @@ struct http_request {
bool message_end; /* boolean flag set when body processing is complete */ bool message_end; /* boolean flag set when body processing is complete */
}; };
static inline void int http_request_get_body(struct http_request *http_request, char **body);
http_request_print(struct http_request *self) void http_request_print(struct http_request *self);
{
printf("Header Count %d\n", self->header_count);
printf("Header Content:\n");
for (int i = 0; i < self->header_count; i++) {
for (int j = 0; j < self->headers[i].key_length; j++) { putchar(self->headers[i].key[j]); }
putchar(':');
for (int j = 0; j < self->headers[i].value_length; j++) { putchar(self->headers[i].value[j]); }
putchar('\n');
}
printf("Body Length %d\n", self->body_length);
printf("Body Read Length %d\n", self->body_read_length);
}
/***************************************************
* General HTTP Request Functions *
**************************************************/
int http_request_get_body(struct http_request *http_request, char **body);

@ -17,41 +17,8 @@ extern _Atomic uint32_t http_total_2XX;
extern _Atomic uint32_t http_total_4XX; extern _Atomic uint32_t http_total_4XX;
#endif #endif
static inline void void http_total_init();
http_total_init() void http_total_increment_request();
{ void http_total_increment_2xx();
atomic_init(&http_total_requests, 0); void http_total_increment_4XX();
atomic_init(&http_total_5XX, 0); void http_total_increment_5XX();
#ifdef LOG_TOTAL_REQS_RESPS
atomic_init(&http_total_2XX, 0);
atomic_init(&http_total_4XX, 0);
#endif
}
static inline void
http_total_increment_request()
{
atomic_fetch_add(&http_total_requests, 1);
}
static inline void
http_total_increment_2xx()
{
#ifdef LOG_TOTAL_REQS_RESPS
atomic_fetch_add(&http_total_2XX, 1);
#endif
}
static inline void
http_total_increment_4XX()
{
#ifdef LOG_TOTAL_REQS_RESPS
atomic_fetch_add(&http_total_4XX, 1);
#endif
}
static inline void
http_total_increment_5XX()
{
atomic_fetch_add(&http_total_5XX, 1);
}

@ -1,10 +1,24 @@
#pragma once #pragma once
#include <stdbool.h>
#include "generic_thread.h" #include "generic_thread.h"
#include "module.h" #include "module.h"
#define LISTENER_THREAD_CORE_ID 0 #define LISTENER_THREAD_CORE_ID 0
extern pthread_t listener_thread_id;
void listener_thread_initialize(void); void listener_thread_initialize(void);
__attribute__((noreturn)) void *listener_thread_main(void *dummy); __attribute__((noreturn)) void *listener_thread_main(void *dummy);
int listener_thread_register_module(struct module *mod); int listener_thread_register_module(struct module *mod);
/**
* Used to determine if running in the context of a listener thread
* @returns true if listener. false if not (probably a worker)
*/
static inline bool
listener_thread_is_running()
{
return pthread_self() == listener_thread_id;
}

@ -19,11 +19,9 @@ struct local_runqueue_config {
local_runqueue_preempt_fn_t preempt_fn; local_runqueue_preempt_fn_t preempt_fn;
}; };
void local_runqueue_initialize(struct local_runqueue_config *config);
/* This is currently only used by worker_thread_wakeup_sandbox */
void local_runqueue_add(struct sandbox *); void local_runqueue_add(struct sandbox *);
void local_runqueue_delete(struct sandbox *); void local_runqueue_delete(struct sandbox *);
bool local_runqueue_is_empty(); bool local_runqueue_is_empty();
struct sandbox *local_runqueue_get_next(); struct sandbox *local_runqueue_get_next();
void local_runqueue_initialize(struct local_runqueue_config *config);
void local_runqueue_preempt(ucontext_t *); void local_runqueue_preempt(ucontext_t *);

@ -28,7 +28,7 @@ typedef ck_spinlock_mcs_t lock_t;
* @param unique_variable_name - a unique prefix to hygienically namespace an associated lock/unlock pair * @param unique_variable_name - a unique prefix to hygienically namespace an associated lock/unlock pair
*/ */
#define LOCK_LOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \ #define LOCK_LOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \
assert(!runtime_is_worker() || !software_interrupt_is_enabled()); \ assert(listener_thread_is_running() || !software_interrupt_is_enabled()); \
struct ck_spinlock_mcs _hygiene_##unique_variable_name##_node; \ struct ck_spinlock_mcs _hygiene_##unique_variable_name##_node; \
uint64_t _hygiene_##unique_variable_name##_pre = __getcycles(); \ uint64_t _hygiene_##unique_variable_name##_pre = __getcycles(); \
ck_spinlock_mcs_lock((lock), &(_hygiene_##unique_variable_name##_node)); \ ck_spinlock_mcs_lock((lock), &(_hygiene_##unique_variable_name##_node)); \
@ -39,8 +39,8 @@ typedef ck_spinlock_mcs_t lock_t;
* @param lock - the address of the lock * @param lock - the address of the lock
* @param unique_variable_name - a unique prefix to hygienically namespace an associated lock/unlock pair * @param unique_variable_name - a unique prefix to hygienically namespace an associated lock/unlock pair
*/ */
#define LOCK_UNLOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \ #define LOCK_UNLOCK_WITH_BOOKKEEPING(lock, unique_variable_name) \
assert(!runtime_is_worker() || !software_interrupt_is_enabled()); \ assert(listener_thread_is_running() || !software_interrupt_is_enabled()); \
ck_spinlock_mcs_unlock(lock, &(_hygiene_##unique_variable_name##_node)); ck_spinlock_mcs_unlock(lock, &(_hygiene_##unique_variable_name##_node));
/** /**

@ -207,29 +207,6 @@ module_release(struct module *module)
return; return;
} }
/**
* Sets the HTTP Request and Response Headers and Content type on a module
* @param module
* @param request_count
* @param request_headers
* @param request_content_type
* @param response_count
* @param response_headers
* @param response_content_type
*/
static inline void
module_set_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[],
int response_count, char *response_headers, char response_content_type[])
{
assert(module);
module->request_header_count = request_count;
memcpy(module->request_headers, request_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT);
strcpy(module->request_content_type, request_content_type);
module->response_header_count = response_count;
memcpy(module->response_headers, response_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT);
strcpy(module->response_content_type, response_content_type);
}
/******************************** /********************************
* Public Methods from module.c * * Public Methods from module.c *
*******************************/ *******************************/

@ -1,36 +1,7 @@
#pragma once #pragma once
#include <errno.h>
#include "panic.h"
#include "module.h" #include "module.h"
int module_database_add(struct module *module);
struct module *module_database_find_by_name(char *name); struct module *module_database_find_by_name(char *name);
struct module *module_database_find_by_socket_descriptor(int socket_descriptor); struct module *module_database_find_by_socket_descriptor(int socket_descriptor);
extern struct module *module_database[];
extern size_t module_database_count;
/**
* Adds a module to the in-memory module DB
* @param module module to add
* @return 0 on success. -ENOSPC when full
*/
static inline int
module_database_add(struct module *module)
{
assert(module_database_count <= MODULE_MAX_MODULE_COUNT);
int rc;
if (module_database_count == MODULE_MAX_MODULE_COUNT) goto err_no_space;
module_database[module_database_count++] = module;
rc = 0;
done:
return rc;
err_no_space:
panic("Cannot add module. Database is full.\n");
rc = -ENOSPC;
goto done;
}

@ -71,21 +71,6 @@ extern void runtime_initialize(void);
extern void runtime_set_resource_limits_to_max(); extern void runtime_set_resource_limits_to_max();
extern void stub_init(int32_t offset); extern void stub_init(int32_t offset);
/**
* Used to determine if running in the context of a worker thread
* @returns true if worker. false if listener core
*/
static inline bool
runtime_is_worker()
{
pthread_t self = pthread_self();
for (int i = 0; i < runtime_worker_threads_count; i++) {
if (runtime_worker_threads[i] == self) return true;
}
return false;
}
static inline char * static inline char *
runtime_print_scheduler(enum RUNTIME_SCHEDULER variant) runtime_print_scheduler(enum RUNTIME_SCHEDULER variant)
{ {

@ -103,15 +103,6 @@ struct sandbox {
char request_response_data[1]; /* of request_response_data_length, following sandbox mem.. */ char request_response_data[1]; /* of request_response_data_length, following sandbox mem.. */
} PAGE_ALIGNED; } PAGE_ALIGNED;
/***************************
* Externs *
**************************/
extern void worker_thread_block_current_sandbox(void);
extern void worker_thread_on_sandbox_exit(struct sandbox *sandbox);
extern void worker_thread_process_io(void);
extern void worker_thread_wakeup_sandbox(struct sandbox *sandbox);
/*************************** /***************************
* Public API * * Public API *
**************************/ **************************/
@ -120,7 +111,6 @@ struct sandbox *sandbox_allocate(struct sandbox_request *sandbox_request);
void sandbox_free(struct sandbox *sandbox); void sandbox_free(struct sandbox *sandbox);
void sandbox_free_linear_memory(struct sandbox *sandbox); void sandbox_free_linear_memory(struct sandbox *sandbox);
void sandbox_main(struct sandbox *sandbox); void sandbox_main(struct sandbox *sandbox);
size_t sandbox_parse_http_request(struct sandbox *sandbox, size_t length);
/** /**
@ -165,26 +155,6 @@ sandbox_initialize_io_handle(struct sandbox *sandbox)
return io_handle_index; return io_handle_index;
} }
/**
* Initializes and returns an IO handle of a sandbox ready for use
* @param sandbox
* @param file_descriptor what we'll set on the IO handle after initialization
* @return index of handle we preopened or -1 if all io_handles are exhausted
*/
static inline int
sandbox_initialize_io_handle_and_set_file_descriptor(struct sandbox *sandbox, int file_descriptor)
{
if (!sandbox) return -1;
if (file_descriptor < 0) return file_descriptor;
int io_handle_index = sandbox_initialize_io_handle(sandbox);
if (io_handle_index != -1) {
sandbox->io_handles[io_handle_index].file_descriptor =
file_descriptor; /* per sandbox, so synchronization necessary! */
}
return io_handle_index;
}
/** /**
* Sets the file descriptor of the sandbox's ith io_handle * Sets the file descriptor of the sandbox's ith io_handle
* Returns error condition if the file_descriptor to set does not contain sandbox preopen magic * Returns error condition if the file_descriptor to set does not contain sandbox preopen magic
@ -232,74 +202,9 @@ sandbox_close_file_descriptor(struct sandbox *sandbox, int io_handle_index)
sandbox->io_handles[io_handle_index].file_descriptor = -1; sandbox->io_handles[io_handle_index].file_descriptor = -1;
} }
/** void sandbox_close_http(struct sandbox *sandbox);
* Prints key performance metrics for a sandbox to runtime_sandbox_perf_log void sandbox_print_perf(struct sandbox *sandbox);
* This is defined by an environment variable void sandbox_summarize_page_allocations(struct sandbox *sandbox);
* @param sandbox
*/
static inline void
sandbox_print_perf(struct sandbox *sandbox)
{
/* If the log was not defined by an environment variable, early out */
if (runtime_sandbox_perf_log == NULL) return;
uint32_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz;
uint32_t queued_us = (sandbox->allocation_timestamp - sandbox->request_arrival_timestamp)
/ runtime_processor_speed_MHz;
uint32_t initializing_us = sandbox->initializing_duration / runtime_processor_speed_MHz;
uint32_t runnable_us = sandbox->runnable_duration / runtime_processor_speed_MHz;
uint32_t running_us = sandbox->running_duration / runtime_processor_speed_MHz;
uint32_t blocked_us = sandbox->blocked_duration / runtime_processor_speed_MHz;
uint32_t returned_us = sandbox->returned_duration / runtime_processor_speed_MHz;
/*
* Assumption: A sandbox is never able to free pages. If linear memory management
* becomes more intelligent, then peak linear memory size needs to be tracked
* seperately from current linear memory size.
*/
fprintf(runtime_sandbox_perf_log, "%lu,%s():%d,%s,%u,%u,%u,%u,%u,%u,%u,%u,%u\n", sandbox->id,
sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state),
sandbox->module->relative_deadline_us, total_time_us, queued_us, initializing_us, runnable_us,
running_us, blocked_us, returned_us, sandbox->linear_memory_size);
}
static inline void
sandbox_summarize_page_allocations(struct sandbox *sandbox)
{
#ifdef LOG_SANDBOX_MEMORY_PROFILE
// TODO: Handle interleavings
char sandbox_page_allocations_log_path[100] = {};
sandbox_page_allocations_log_path[99] = '\0';
snprintf(sandbox_page_allocations_log_path, 99, "%s_%d_page_allocations.csv", sandbox->module->name,
sandbox->module->port);
debuglog("Logging to %s", sandbox_page_allocations_log_path);
FILE *sandbox_page_allocations_log = fopen(sandbox_page_allocations_log_path, "a");
fprintf(sandbox_page_allocations_log, "%lu,%lu,%s,", sandbox->id, sandbox->running_duration,
sandbox_state_stringify(sandbox->state));
for (size_t i = 0; i < sandbox->page_allocation_timestamps_size; i++)
fprintf(sandbox_page_allocations_log, "%u,", sandbox->page_allocation_timestamps[i]);
fprintf(sandbox_page_allocations_log, "\n");
#else
return;
#endif
}
static inline void
sandbox_close_http(struct sandbox *sandbox)
{
assert(sandbox != NULL);
int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, sandbox->client_socket_descriptor, NULL);
if (unlikely(rc < 0)) panic_err();
client_socket_close(sandbox->client_socket_descriptor, &sandbox->client_address);
}
INLINE void sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sandbox_request, INLINE void sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sandbox_request,
uint64_t allocation_timestamp); uint64_t allocation_timestamp);

@ -47,3 +47,6 @@ worker_thread_get_memory_string(uint32_t offset, uint32_t max_length)
} }
return NULL; return NULL;
} }
void worker_thread_block_current_sandbox(void);
__attribute__((noreturn)) void worker_thread_on_sandbox_exit();

@ -1,6 +1,120 @@
#include <stdatomic.h>
#include <unistd.h>
#include "admissions_control.h" #include "admissions_control.h"
#include "debuglog.h"
#include "client_socket.h"
/*
* Unitless estimate of the instantaneous fraction of system capacity required to complete all previously
* admitted work. This is used to calculate free capacity as part of admissions control
*
* The estimated requirements of a single admitted request is calculated as
* estimated execution time (cycles) / relative deadline (cycles)
*
* These estimates are incremented on request acceptance and decremented on request completion (either
* success or failure)
*/
_Atomic uint64_t admissions_control_admitted; _Atomic uint64_t admissions_control_admitted;
uint64_t admissions_control_capacity; uint64_t admissions_control_capacity;
const double admissions_control_overhead = 0.2; const double admissions_control_overhead = 0.2;
void
admissions_control_initialize()
{
#ifdef ADMISSIONS_CONTROL
atomic_init(&admissions_control_admitted, 0);
admissions_control_capacity = runtime_worker_threads_count * ADMISSIONS_CONTROL_GRANULARITY
* ((double)1.0 - admissions_control_overhead);
#endif
}
void
admissions_control_add(uint64_t admissions_estimate)
{
#ifdef ADMISSIONS_CONTROL
assert(admissions_estimate > 0);
atomic_fetch_add(&admissions_control_admitted, admissions_estimate);
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity);
#endif
#endif /* ADMISSIONS_CONTROL */
}
void
admissions_control_subtract(uint64_t admissions_estimate)
{
#ifdef ADMISSIONS_CONTROL
/* Assumption: Should never underflow */
if (unlikely(admissions_estimate > admissions_control_admitted)) panic("Admissions Estimate underflow\n");
atomic_fetch_sub(&admissions_control_admitted, admissions_estimate);
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %lu / %lu\n", admissions_control_admitted, admissions_control_capacity);
#endif
#endif /* ADMISSIONS_CONTROL */
}
uint64_t
admissions_control_calculate_estimate(uint64_t estimated_execution, uint64_t relative_deadline)
{
#ifdef ADMISSIONS_CONTROL
assert(relative_deadline != 0);
uint64_t admissions_estimate = (estimated_execution * (uint64_t)ADMISSIONS_CONTROL_GRANULARITY)
/ relative_deadline;
if (admissions_estimate == 0)
panic("Ratio of Deadline to Execution time cannot exceed %d\n", ADMISSIONS_CONTROL_GRANULARITY);
return admissions_estimate;
#else
return 0;
#endif
}
uint64_t
admissions_control_calculate_estimate_us(uint32_t estimated_execution_us, uint32_t relative_deadline_us)
{
#ifdef ADMISSIONS_CONTROL
assert(relative_deadline_us != 0);
return (uint64_t)((uint64_t)(estimated_execution_us * ADMISSIONS_CONTROL_GRANULARITY)) / relative_deadline_us;
#else
return 0;
#endif
}
void
admissions_control_log_decision(uint64_t admissions_estimate, bool admitted)
{
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Admitted: %lu, Capacity: %lu, Estimate: %lu, Admitted? %s\n", admissions_control_admitted,
admissions_control_capacity, admissions_estimate, admitted ? "yes" : "no");
#endif /* LOG_ADMISSIONS_CONTROL */
}
uint64_t
admissions_control_decide(uint64_t admissions_estimate)
{
uint64_t work_admitted = 1; /* Nominal non-zero value in case admissions control is disabled */
#ifdef ADMISSIONS_CONTROL
if (unlikely(admissions_estimate == 0)) panic("Admissions estimate should never be zero");
uint64_t total_admitted = atomic_load(&admissions_control_admitted);
if (total_admitted + admissions_estimate >= admissions_control_capacity) {
admissions_control_log_decision(admissions_estimate, false);
work_admitted = 0;
} else {
admissions_control_log_decision(admissions_estimate, true);
admissions_control_add(admissions_estimate);
work_admitted = admissions_estimate;
}
#endif /* ADMISSIONS_CONTROL */
return work_admitted;
}

@ -0,0 +1,52 @@
#include "admissions_info.h"
#include "debuglog.h"
/**
* Initializes perf window
* @param self
*/
void
admissions_info_initialize(struct admissions_info *self, int percentile, uint64_t expected_execution,
uint64_t relative_deadline)
{
#ifdef ADMISSIONS_CONTROL
assert(relative_deadline > 0);
assert(expected_execution > 0);
self->relative_deadline = relative_deadline;
self->estimate = admissions_control_calculate_estimate(expected_execution, relative_deadline);
debuglog("Initial Estimate: %lu\n", self->estimate);
assert(self != NULL);
perf_window_initialize(&self->perf_window);
if (unlikely(percentile < 50 || percentile > 99)) panic("Invalid admissions percentile");
self->percentile = percentile;
self->control_index = PERF_WINDOW_BUFFER_SIZE * percentile / 100;
#ifdef LOG_ADMISSIONS_CONTROL
debuglog("Percentile: %d\n", self->percentile);
debuglog("Control Index: %d\n", self->control_index);
#endif
#endif
}
/*
* Adds an execution value to the perf window and calculates and caches and updated estimate
* @param self
* @param execution_duration
*/
void
admissions_info_update(struct admissions_info *self, uint64_t execution_duration)
{
#ifdef ADMISSIONS_CONTROL
assert(!software_interrupt_is_enabled());
struct perf_window *perf_window = &self->perf_window;
LOCK_LOCK(&self->perf_window.lock);
perf_window_add(perf_window, execution_duration);
uint64_t estimated_execution = perf_window_get_percentile(perf_window, self->percentile, self->control_index);
self->estimate = admissions_control_calculate_estimate(estimated_execution, self->relative_deadline);
LOCK_UNLOCK(&self->perf_window.lock);
#endif
}

@ -0,0 +1,78 @@
#include <arpa/inet.h>
#include <errno.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include "client_socket.h"
#include "debuglog.h"
#include "http_response.h"
#include "http_total.h"
#include "likely.h"
#include "panic.h"
#include "worker_thread.h"
void
client_socket_close(int client_socket, struct sockaddr *client_address)
{
/* Should never close 0, 1, or 2 */
assert(client_socket != STDIN_FILENO);
assert(client_socket != STDOUT_FILENO);
assert(client_socket != STDERR_FILENO);
if (unlikely(close(client_socket) < 0)) {
char client_address_text[INET6_ADDRSTRLEN] = {};
if (unlikely(inet_ntop(AF_INET, &client_address, client_address_text, INET6_ADDRSTRLEN) == NULL)) {
debuglog("Failed to log client_address: %s", strerror(errno));
}
debuglog("Error closing client socket %d associated with %s - %s", client_socket, client_address_text,
strerror(errno));
}
}
/**
* Rejects request due to admission control or error
* @param client_socket - the client we are rejecting
* @param status_code - either 503 or 400
*/
int
client_socket_send(int client_socket, int status_code)
{
const char *response;
int rc;
switch (status_code) {
case 503:
response = HTTP_RESPONSE_503_SERVICE_UNAVAILABLE;
http_total_increment_5XX();
break;
case 400:
response = HTTP_RESPONSE_400_BAD_REQUEST;
http_total_increment_4XX();
break;
default:
panic("%d is not a valid status code\n", status_code);
}
int sent = 0;
int to_send = strlen(response);
while (sent < to_send) {
rc = write(client_socket, &response[sent], to_send - sent);
if (rc < 0) {
if (errno == EAGAIN) { debuglog("Unexpectedly blocking on write of %s\n", response); }
debuglog("Error with %s\n", strerror(errno));
goto send_err;
}
sent += rc;
};
rc = 0;
done:
return rc;
send_err:
debuglog("Error sending to client: %s", strerror(errno));
rc = -1;
goto done;
}

@ -43,51 +43,3 @@ current_sandbox_set(struct sandbox *sandbox)
worker_thread_current_sandbox = sandbox; worker_thread_current_sandbox = sandbox;
} }
} }
/**
* Initializes and returns an IO handle on the current sandbox ready for use
* @return index of handle we preopened or -1 if all io_handles are exhausted
*/
int
current_sandbox_initialize_io_handle(void)
{
return sandbox_initialize_io_handle(current_sandbox_get());
}
size_t sandbox_parse_http_request(struct sandbox *sandbox, size_t l);
/**
* Sets the file descriptor of the sandbox's ith io_handle
* Returns error condition if the file_descriptor to set does not contain sandbox preopen magin
* @param io_handle_index index of the sandbox io_handle we want to set
* @param file_descriptor the file descripter we want to set it to
* @returns the index that was set or -1 in case of error
*/
int
current_sandbox_set_file_descriptor(int io_handle_index, int file_descriptor)
{
return sandbox_set_file_descriptor(current_sandbox_get(), io_handle_index, file_descriptor);
}
/**
* Get the file descriptor of the sandbox's ith io_handle
* @param io_handle_index index into the sandbox's io_handles table
* @returns file descriptor
*/
int
current_sandbox_get_file_descriptor(int io_handle_index)
{
struct sandbox *sandbox = current_sandbox_get();
return sandbox_get_file_descriptor(sandbox, io_handle_index);
}
/**
* Close the sandbox's ith io_handle
* @param io_handle_index index of the handle to close
*/
void
current_sandbox_close_file_descriptor(int io_handle_index)
{
struct sandbox *sandbox = current_sandbox_get();
sandbox_close_file_descriptor(sandbox, io_handle_index);
}

@ -1,6 +1,7 @@
#include <stdint.h> #include <stdint.h>
#include "arch/getcycles.h" #include "arch/getcycles.h"
#include "debuglog.h"
/* Implemented by listener and workers */ /* Implemented by listener and workers */
@ -13,3 +14,18 @@ generic_thread_initialize()
generic_thread_start_timestamp = __getcycles(); generic_thread_start_timestamp = __getcycles();
generic_thread_lock_duration = 0; generic_thread_lock_duration = 0;
} }
/**
* Reports lock contention
*/
void
generic_thread_dump_lock_overhead()
{
#ifndef NDEBUG
#ifdef LOG_LOCK_OVERHEAD
uint64_t duration = __getcycles() - generic_thread_start_timestamp;
debuglog("Locks consumed %lu / %lu cycles, or %f%%\n", generic_thread_lock_duration, duration,
(double)generic_thread_lock_duration / duration * 100);
#endif
#endif
}

@ -1,6 +1,7 @@
#include <assert.h> #include <assert.h>
#include "global_request_scheduler.h" #include "global_request_scheduler.h"
#include "listener_thread.h"
#include "panic.h" #include "panic.h"
#include "priority_queue.h" #include "priority_queue.h"
#include "runtime.h" #include "runtime.h"
@ -17,7 +18,7 @@ global_request_scheduler_minheap_add(void *sandbox_request)
{ {
assert(sandbox_request); assert(sandbox_request);
assert(global_request_scheduler_minheap); assert(global_request_scheduler_minheap);
if (unlikely(runtime_is_worker())) panic("%s is only callable by the listener thread\n", __func__); if (unlikely(!listener_thread_is_running())) panic("%s is only callable by the listener thread\n", __func__);
int return_code = priority_queue_enqueue(global_request_scheduler_minheap, sandbox_request); int return_code = priority_queue_enqueue(global_request_scheduler_minheap, sandbox_request);
/* TODO: Propagate -1 to caller. Issue #91 */ /* TODO: Propagate -1 to caller. Issue #91 */

@ -1,3 +1,5 @@
#include <stdio.h>
#include "http_request.h" #include "http_request.h"
/*************************************************** /***************************************************
@ -16,3 +18,18 @@ http_request_get_body(struct http_request *http_request, char **body)
*body = http_request->body; *body = http_request->body;
return http_request->body_length; return http_request->body_length;
} }
void
http_request_print(struct http_request *self)
{
printf("Header Count %d\n", self->header_count);
printf("Header Content:\n");
for (int i = 0; i < self->header_count; i++) {
for (int j = 0; j < self->headers[i].key_length; j++) { putchar(self->headers[i].key[j]); }
putchar(':');
for (int j = 0; j < self->headers[i].value_length; j++) { putchar(self->headers[i].value[j]); }
putchar('\n');
}
printf("Body Length %d\n", self->body_length);
printf("Body Read Length %d\n", self->body_read_length);
}

@ -13,6 +13,46 @@ _Atomic uint32_t http_total_2XX = 0;
_Atomic uint32_t http_total_4XX = 0; _Atomic uint32_t http_total_4XX = 0;
#endif #endif
void
http_total_init()
{
atomic_init(&http_total_requests, 0);
atomic_init(&http_total_5XX, 0);
#ifdef LOG_TOTAL_REQS_RESPS
atomic_init(&http_total_2XX, 0);
atomic_init(&http_total_4XX, 0);
#endif
}
void
http_total_increment_request()
{
atomic_fetch_add(&http_total_requests, 1);
}
void
http_total_increment_2xx()
{
#ifdef LOG_TOTAL_REQS_RESPS
atomic_fetch_add(&http_total_2XX, 1);
#endif
}
void
http_total_increment_4XX()
{
#ifdef LOG_TOTAL_REQS_RESPS
atomic_fetch_add(&http_total_4XX, 1);
#endif
}
void
http_total_increment_5XX()
{
atomic_fetch_add(&http_total_5XX, 1);
}
void void
http_total_log() http_total_log()
{ {

@ -131,10 +131,11 @@ err:
int32_t int32_t
wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size) wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size)
{ {
struct sandbox *s = current_sandbox_get();
if (fd == 1 || fd == 2) { if (fd == 1 || fd == 2) {
char * buffer = worker_thread_get_memory_ptr_void(buf_offset, buf_size); char *buffer = worker_thread_get_memory_ptr_void(buf_offset, buf_size);
struct sandbox *s = current_sandbox_get(); int l = s->module->max_response_size - s->request_response_data_length;
int l = s->module->max_response_size - s->request_response_data_length;
if (l > buf_size) l = buf_size; if (l > buf_size) l = buf_size;
if (l == 0) return 0; if (l == 0) return 0;
memcpy(s->request_response_data + s->request_response_data_length, buffer, l); memcpy(s->request_response_data + s->request_response_data_length, buffer, l);
@ -143,7 +144,7 @@ wasm_write(int32_t fd, int32_t buf_offset, int32_t buf_size)
return l; return l;
} }
int f = current_sandbox_get_file_descriptor(fd); int f = sandbox_get_file_descriptor(s, fd);
char *buf = worker_thread_get_memory_ptr_void(buf_offset, buf_size); char *buf = worker_thread_get_memory_ptr_void(buf_offset, buf_size);
int32_t res = 0; int32_t res = 0;
@ -192,7 +193,7 @@ wasm_open(int32_t path_off, int32_t flags, int32_t mode)
{ {
char *path = worker_thread_get_memory_string(path_off, MODULE_MAX_PATH_LENGTH); char *path = worker_thread_get_memory_string(path_off, MODULE_MAX_PATH_LENGTH);
int iofd = current_sandbox_initialize_io_handle(); int iofd = sandbox_initialize_io_handle(current_sandbox_get());
if (iofd < 0) return -1; if (iofd < 0) return -1;
int32_t modified_flags = 0; int32_t modified_flags = 0;
@ -237,7 +238,8 @@ wasm_open(int32_t path_off, int32_t flags, int32_t mode)
int32_t int32_t
wasm_close(int32_t io_handle_index) wasm_close(int32_t io_handle_index)
{ {
int fd = current_sandbox_get_file_descriptor(io_handle_index); struct sandbox *sandbox = current_sandbox_get();
int fd = sandbox_get_file_descriptor(sandbox, io_handle_index);
// Silently disregard client requests to close STDIN, STDOUT, or STDERR // Silently disregard client requests to close STDIN, STDOUT, or STDERR
if (fd <= STDERR_FILENO) return 0; if (fd <= STDERR_FILENO) return 0;

@ -1,6 +1,8 @@
#include <stdint.h> #include <stdint.h>
#include <unistd.h>
#include "arch/getcycles.h" #include "arch/getcycles.h"
#include "client_socket.h"
#include "global_request_scheduler.h" #include "global_request_scheduler.h"
#include "generic_thread.h" #include "generic_thread.h"
#include "listener_thread.h" #include "listener_thread.h"
@ -15,6 +17,8 @@ int listener_thread_epoll_file_descriptor;
/* Timestamp when listener thread began executing */ /* Timestamp when listener thread began executing */
static __thread uint64_t listener_thread_start_timestamp; static __thread uint64_t listener_thread_start_timestamp;
pthread_t listener_thread_id;
/** /**
* Initializes the listener thread, pinned to core 0, and starts to listen for requests * Initializes the listener thread, pinned to core 0, and starts to listen for requests
*/ */
@ -31,15 +35,14 @@ listener_thread_initialize(void)
listener_thread_epoll_file_descriptor = epoll_create1(0); listener_thread_epoll_file_descriptor = epoll_create1(0);
assert(listener_thread_epoll_file_descriptor >= 0); assert(listener_thread_epoll_file_descriptor >= 0);
pthread_t listener_thread; int ret = pthread_create(&listener_thread_id, NULL, listener_thread_main, NULL);
int ret = pthread_create(&listener_thread, NULL, listener_thread_main, NULL);
assert(ret == 0); assert(ret == 0);
ret = pthread_setaffinity_np(listener_thread, sizeof(cpu_set_t), &cs); ret = pthread_setaffinity_np(listener_thread_id, sizeof(cpu_set_t), &cs);
assert(ret == 0); assert(ret == 0);
ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs); ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs);
assert(ret == 0); assert(ret == 0);
printf("\tListener core thread: %lx\n", listener_thread); printf("\tListener core thread: %lx\n", listener_thread_id);
} }
/** /**

@ -84,6 +84,31 @@ err:
goto done; goto done;
} }
/**
* Sets the HTTP Request and Response Headers and Content type on a module
* @param module
* @param request_count
* @param request_headers
* @param request_content_type
* @param response_count
* @param response_headers
* @param response_content_type
*/
static inline void
module_set_http_info(struct module *module, int request_count, char *request_headers, char request_content_type[],
int response_count, char *response_headers, char response_content_type[])
{
assert(module);
module->request_header_count = request_count;
memcpy(module->request_headers, request_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT);
strcpy(module->request_content_type, request_content_type);
module->response_header_count = response_count;
memcpy(module->response_headers, response_headers, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT);
strcpy(module->response_content_type, response_content_type);
}
/*************************************** /***************************************
* Public Methods * Public Methods
***************************************/ ***************************************/

@ -1,4 +1,7 @@
#include <errno.h>
#include "module_database.h" #include "module_database.h"
#include "panic.h"
/******************* /*******************
* Module Database * * Module Database *
@ -7,6 +10,31 @@
struct module *module_database[MODULE_MAX_MODULE_COUNT] = { NULL }; struct module *module_database[MODULE_MAX_MODULE_COUNT] = { NULL };
size_t module_database_count = 0; size_t module_database_count = 0;
/**
* Adds a module to the in-memory module DB
* @param module module to add
* @return 0 on success. -ENOSPC when full
*/
int
module_database_add(struct module *module)
{
assert(module_database_count <= MODULE_MAX_MODULE_COUNT);
int rc;
if (module_database_count == MODULE_MAX_MODULE_COUNT) goto err_no_space;
module_database[module_database_count++] = module;
rc = 0;
done:
return rc;
err_no_space:
panic("Cannot add module. Database is full.\n");
rc = -ENOSPC;
goto done;
}
/** /**
* Given a name, find the associated module * Given a name, find the associated module
* @param name * @param name

@ -5,6 +5,7 @@
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include "listener_thread.h"
#include "panic.h" #include "panic.h"
#include "priority_queue.h" #include "priority_queue.h"
@ -55,7 +56,7 @@ priority_queue_is_empty(struct priority_queue *self)
{ {
assert(self != NULL); assert(self != NULL);
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
assert(!runtime_is_worker() || !software_interrupt_is_enabled()); assert(listener_thread_is_running() || !software_interrupt_is_enabled());
return self->size == 0; return self->size == 0;
} }
@ -133,7 +134,7 @@ priority_queue_percolate_down(struct priority_queue *self, int parent_index)
assert(self != NULL); assert(self != NULL);
assert(self->get_priority_fn != NULL); assert(self->get_priority_fn != NULL);
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
assert(runtime_is_worker()); assert(!listener_thread_is_running());
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
bool update_highest_value = parent_index == 1; bool update_highest_value = parent_index == 1;
@ -179,7 +180,6 @@ struct priority_queue *
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn) priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn)
{ {
assert(get_priority_fn != NULL); assert(get_priority_fn != NULL);
assert(!runtime_is_worker() || !software_interrupt_is_enabled());
/* Add one to capacity because this data structure ignores the element at 0 */ /* Add one to capacity because this data structure ignores the element at 0 */
size_t one_based_capacity = capacity + 1; size_t one_based_capacity = capacity + 1;
@ -207,7 +207,7 @@ void
priority_queue_free(struct priority_queue *self) priority_queue_free(struct priority_queue *self)
{ {
assert(self != NULL); assert(self != NULL);
assert(!runtime_is_worker() || !software_interrupt_is_enabled()); assert(listener_thread_is_running() || !software_interrupt_is_enabled());
free(self); free(self);
} }
@ -220,7 +220,7 @@ int
priority_queue_length_nolock(struct priority_queue *self) priority_queue_length_nolock(struct priority_queue *self)
{ {
assert(self != NULL); assert(self != NULL);
assert(runtime_is_worker()); assert(!listener_thread_is_running());
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
@ -250,7 +250,7 @@ priority_queue_enqueue_nolock(struct priority_queue *self, void *value)
{ {
assert(self != NULL); assert(self != NULL);
assert(value != NULL); assert(value != NULL);
assert(!runtime_is_worker() || !software_interrupt_is_enabled()); assert(listener_thread_is_running() || !software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
int rc; int rc;
@ -294,7 +294,7 @@ priority_queue_delete_nolock(struct priority_queue *self, void *value)
{ {
assert(self != NULL); assert(self != NULL);
assert(value != NULL); assert(value != NULL);
assert(runtime_is_worker()); assert(!listener_thread_is_running());
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
@ -361,7 +361,7 @@ priority_queue_dequeue_if_earlier_nolock(struct priority_queue *self, void **deq
assert(self != NULL); assert(self != NULL);
assert(dequeued_element != NULL); assert(dequeued_element != NULL);
assert(self->get_priority_fn != NULL); assert(self->get_priority_fn != NULL);
assert(runtime_is_worker()); assert(!listener_thread_is_running());
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
@ -414,7 +414,7 @@ priority_queue_top_nolock(struct priority_queue *self, void **dequeued_element)
assert(self != NULL); assert(self != NULL);
assert(dequeued_element != NULL); assert(dequeued_element != NULL);
assert(self->get_priority_fn != NULL); assert(self->get_priority_fn != NULL);
assert(runtime_is_worker()); assert(!listener_thread_is_running());
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock)); assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));

@ -1,3 +1,4 @@
#include <arpa/inet.h>
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <signal.h> #include <signal.h>
@ -234,6 +235,26 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox)
return 0; return 0;
} }
/**
* Initializes and returns an IO handle of a sandbox ready for use
* @param sandbox
* @param file_descriptor what we'll set on the IO handle after initialization
* @return index of handle we preopened or -1 if all io_handles are exhausted
*/
static inline int
sandbox_initialize_io_handle_and_set_file_descriptor(struct sandbox *sandbox, int file_descriptor)
{
if (!sandbox) return -1;
if (file_descriptor < 0) return file_descriptor;
int io_handle_index = sandbox_initialize_io_handle(sandbox);
if (io_handle_index != -1) {
sandbox->io_handles[io_handle_index].file_descriptor =
file_descriptor; /* per sandbox, so synchronization necessary! */
}
return io_handle_index;
}
static inline void static inline void
sandbox_open_http(struct sandbox *sandbox) sandbox_open_http(struct sandbox *sandbox)
{ {
@ -268,19 +289,85 @@ sandbox_initialize_io_handles_and_file_descriptors(struct sandbox *sandbox)
assert(f == 2); assert(f == 2);
} }
/**
* Prints key performance metrics for a sandbox to runtime_sandbox_perf_log
* This is defined by an environment variable
* @param sandbox
*/
void
sandbox_print_perf(struct sandbox *sandbox)
{
/* If the log was not defined by an environment variable, early out */
if (runtime_sandbox_perf_log == NULL) return;
uint32_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz;
uint32_t queued_us = (sandbox->allocation_timestamp - sandbox->request_arrival_timestamp)
/ runtime_processor_speed_MHz;
uint32_t initializing_us = sandbox->initializing_duration / runtime_processor_speed_MHz;
uint32_t runnable_us = sandbox->runnable_duration / runtime_processor_speed_MHz;
uint32_t running_us = sandbox->running_duration / runtime_processor_speed_MHz;
uint32_t blocked_us = sandbox->blocked_duration / runtime_processor_speed_MHz;
uint32_t returned_us = sandbox->returned_duration / runtime_processor_speed_MHz;
/*
* Assumption: A sandbox is never able to free pages. If linear memory management
* becomes more intelligent, then peak linear memory size needs to be tracked
* seperately from current linear memory size.
*/
fprintf(runtime_sandbox_perf_log, "%lu,%s():%d,%s,%u,%u,%u,%u,%u,%u,%u,%u,%u\n", sandbox->id,
sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state),
sandbox->module->relative_deadline_us, total_time_us, queued_us, initializing_us, runnable_us,
running_us, blocked_us, returned_us, sandbox->linear_memory_size);
}
void
sandbox_summarize_page_allocations(struct sandbox *sandbox)
{
#ifdef LOG_SANDBOX_MEMORY_PROFILE
// TODO: Handle interleavings
char sandbox_page_allocations_log_path[100] = {};
sandbox_page_allocations_log_path[99] = '\0';
snprintf(sandbox_page_allocations_log_path, 99, "%s_%d_page_allocations.csv", sandbox->module->name,
sandbox->module->port);
debuglog("Logging to %s", sandbox_page_allocations_log_path);
FILE *sandbox_page_allocations_log = fopen(sandbox_page_allocations_log_path, "a");
fprintf(sandbox_page_allocations_log, "%lu,%lu,%s,", sandbox->id, sandbox->running_duration,
sandbox_state_stringify(sandbox->state));
for (size_t i = 0; i < sandbox->page_allocation_timestamps_size; i++)
fprintf(sandbox_page_allocations_log, "%u,", sandbox->page_allocation_timestamps[i]);
fprintf(sandbox_page_allocations_log, "\n");
#else
return;
#endif
}
void
sandbox_close_http(struct sandbox *sandbox)
{
assert(sandbox != NULL);
int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, sandbox->client_socket_descriptor, NULL);
if (unlikely(rc < 0)) panic_err();
client_socket_close(sandbox->client_socket_descriptor, &sandbox->client_address);
}
/** /**
* Sandbox execution logic * Sandbox execution logic
* Handles setup, request parsing, WebAssembly initialization, function execution, response building and * Handles setup, request parsing, WebAssembly initialization, function execution, response building and
* sending, and cleanup * sending, and cleanup
*/ */
void void
current_sandbox_main(void) sandbox_start(void)
{ {
struct sandbox *sandbox = current_sandbox_get(); struct sandbox *sandbox = current_sandbox_get();
assert(sandbox != NULL); assert(sandbox != NULL);
assert(sandbox->state == SANDBOX_RUNNING); assert(sandbox->state == SANDBOX_RUNNING);
int rc;
char *error_message = ""; char *error_message = "";
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
@ -292,8 +379,7 @@ current_sandbox_main(void)
sandbox_open_http(sandbox); sandbox_open_http(sandbox);
/* Parse the request */ /* Parse the request */
rc = sandbox_receive_and_parse_client_request(sandbox); if (sandbox_receive_and_parse_client_request(sandbox) < 0) {
if (rc < 0) {
error_message = "Unable to receive and parse client request\n"; error_message = "Unable to receive and parse client request\n";
goto err; goto err;
}; };
@ -313,8 +399,7 @@ current_sandbox_main(void)
sandbox->completion_timestamp = __getcycles(); sandbox->completion_timestamp = __getcycles();
/* Retrieve the result, construct the HTTP response, and send to client */ /* Retrieve the result, construct the HTTP response, and send to client */
rc = sandbox_build_and_send_client_response(sandbox); if (sandbox_build_and_send_client_response(sandbox) < 0) {
if (rc < 0) {
error_message = "Unable to build and send client response\n"; error_message = "Unable to build and send client response\n";
goto err; goto err;
}; };
@ -473,8 +558,7 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand
/* Initialize the sandbox's context, stack, and instruction pointer */ /* Initialize the sandbox's context, stack, and instruction pointer */
/* stack_start points to the bottom of the usable stack, so add stack_size to get to top */ /* stack_start points to the bottom of the usable stack, so add stack_size to get to top */
arch_context_init(&sandbox->ctxt, (reg_t)current_sandbox_main, arch_context_init(&sandbox->ctxt, (reg_t)sandbox_start, (reg_t)sandbox->stack_start + sandbox->stack_size);
(reg_t)sandbox->stack_start + sandbox->stack_size);
/* Initialize file descriptors to -1 */ /* Initialize file descriptors to -1 */
for (int i = 0; i < SANDBOX_MAX_IO_HANDLE_COUNT; i++) sandbox->io_handles[i].file_descriptor = -1; for (int i = 0; i < SANDBOX_MAX_IO_HANDLE_COUNT; i++) sandbox->io_handles[i].file_descriptor = -1;

@ -11,6 +11,7 @@
#include "current_sandbox.h" #include "current_sandbox.h"
#include "debuglog.h" #include "debuglog.h"
#include "global_request_scheduler.h" #include "global_request_scheduler.h"
#include "listener_thread.h"
#include "local_runqueue.h" #include "local_runqueue.h"
#include "module.h" #include "module.h"
#include "panic.h" #include "panic.h"
@ -169,7 +170,7 @@ static inline void
software_interrupt_validate_worker() software_interrupt_validate_worker()
{ {
#ifndef NDEBUG #ifndef NDEBUG
if (!runtime_is_worker()) panic("A non-worker thread has unexpectedly received a signal!"); if (listener_thread_is_running()) panic("The listener thread unexpectedly received a signal!");
#endif #endif
} }

@ -341,9 +341,8 @@ worker_thread_main(void *argument)
* releases the linear memory, and then returns to the base context * releases the linear memory, and then returns to the base context
*/ */
__attribute__((noreturn)) void __attribute__((noreturn)) void
worker_thread_on_sandbox_exit(struct sandbox *exiting_sandbox) worker_thread_on_sandbox_exit()
{ {
assert(exiting_sandbox);
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
generic_thread_dump_lock_overhead(); generic_thread_dump_lock_overhead();
worker_thread_switch_to_base_context(); worker_thread_switch_to_base_context();

Loading…
Cancel
Save