feat: Initial MVP of admissions control

main
Sean McBride 4 years ago
parent abfb9b18e4
commit 76ba308c16

@ -29,6 +29,18 @@ extern float runtime_processor_speed_MHz;
extern pthread_t runtime_worker_threads[]; extern pthread_t runtime_worker_threads[];
extern uint32_t runtime_worker_threads_count; extern uint32_t runtime_worker_threads_count;
/*
* Unitless estimate of the instantaneous fraction of system capacity required to complete all previously
* admitted work. This is used to calculate free capacity as part of admissions control
*
* The estimated requirements of a single admitted request is calculated as
* estimated execution time (cycles) / relative deadline (cycles)
*
* These estimates are incremented on request acceptance and decremented on request completion (either
* success or failure)
*/
extern double runtime_admitted;
void alloc_linear_memory(void); void alloc_linear_memory(void);
void expand_memory(void); void expand_memory(void);
INLINE char *get_function_from_table(uint32_t idx, uint32_t type_id); INLINE char *get_function_from_table(uint32_t idx, uint32_t type_id);

@ -81,6 +81,12 @@ struct sandbox {
uint64_t absolute_deadline; uint64_t absolute_deadline;
uint64_t total_time; /* From Request to Response */ uint64_t total_time; /* From Request to Response */
/*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) / relative deadline (cycles)
*/
double admissions_estimate;
struct module *module; /* the module this is an instance of */ struct module *module; /* the module this is an instance of */
int32_t arguments_offset; /* actual placement of arguments in the sandbox. */ int32_t arguments_offset; /* actual placement of arguments in the sandbox. */

@ -14,6 +14,12 @@ struct sandbox_request {
struct sockaddr *socket_address; struct sockaddr *socket_address;
uint64_t request_arrival_timestamp; /* cycles */ uint64_t request_arrival_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */ uint64_t absolute_deadline; /* cycles */
/*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) / relative deadline (cycles)
*/
double admissions_estimate;
}; };
DEQUE_PROTOTYPE(sandbox, struct sandbox_request *); DEQUE_PROTOTYPE(sandbox, struct sandbox_request *);
@ -29,7 +35,8 @@ DEQUE_PROTOTYPE(sandbox, struct sandbox_request *);
*/ */
static inline struct sandbox_request * static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor, sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor,
const struct sockaddr *socket_address, uint64_t request_arrival_timestamp) const struct sockaddr *socket_address, uint64_t request_arrival_timestamp,
double admissions_estimate)
{ {
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request)); struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
assert(sandbox_request); assert(sandbox_request);
@ -39,6 +46,7 @@ sandbox_request_allocate(struct module *module, char *arguments, int socket_desc
sandbox_request->socket_address = (struct sockaddr *)socket_address; sandbox_request->socket_address = (struct sockaddr *)socket_address;
sandbox_request->request_arrival_timestamp = request_arrival_timestamp; sandbox_request->request_arrival_timestamp = request_arrival_timestamp;
sandbox_request->absolute_deadline = request_arrival_timestamp + module->relative_deadline; sandbox_request->absolute_deadline = request_arrival_timestamp + module->relative_deadline;
sandbox_request->admissions_estimate = admissions_estimate;
debuglog("Allocating %lu of %s:%d\n", sandbox_request->request_arrival_timestamp, sandbox_request->module->name, debuglog("Allocating %lu of %s:%d\n", sandbox_request->request_arrival_timestamp, sandbox_request->module->name,
sandbox_request->module->port); sandbox_request->module->port);

@ -16,7 +16,8 @@
* Shared Process State * * Shared Process State *
**************************/ **************************/
int runtime_epoll_file_descriptor; int runtime_epoll_file_descriptor;
double runtime_admitted;
/****************************************** /******************************************
* Shared Process / Listener Thread Logic * * Shared Process / Listener Thread Logic *
@ -44,6 +45,8 @@ runtime_initialize(void)
/* Initialize http_parser_settings global */ /* Initialize http_parser_settings global */
http_parser_settings_initialize(); http_parser_settings_initialize();
runtime_admitted = 0;
} }
/************************* /*************************
@ -92,14 +95,45 @@ listener_thread_main(void *dummy)
} }
total_requests++; total_requests++;
/* Perform Admission Control */
/*
* TODO: Enhance to use configurable percentiles rather than just mean. This can be policy
* defined in the module specification
*/
uint64_t estimated_execution = perf_window_get_mean(&module->perf_window);
/*
* If this is the first execution, assume a default execution
* TODO: Enhance module specification to provide "seed" value of estimated duration
* TODO: Should we "rate limit" or only admit one request before we have actual data? Otherwise
* we might be flooded with sandboxes that possibly underestimate
*/
if (estimated_execution == -1) estimated_execution = 1000;
double admissions_estimate = (double)estimated_execution / module->relative_deadline;
/*
* Reject Requests that exceed system capacity
* TODO: Enhance to gracefully return HTTP status code 503 Service Unavailable
*/
if (runtime_admitted + admissions_estimate >= runtime_worker_threads_count) {
debuglog("Would have rejected!");
}
/* Allocate a Sandbox Request */ /* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request = struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, module->name, socket_descriptor, sandbox_request_allocate(module, module->name, socket_descriptor,
(const struct sockaddr *)&client_address, request_arrival_timestamp); (const struct sockaddr *)&client_address, request_arrival_timestamp,
admissions_estimate);
assert(sandbox_request); assert(sandbox_request);
/* Add to the Global Sandbox Request Scheduler */ /* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request); global_request_scheduler_add(sandbox_request);
/* Add to work accepted by the runtime */
runtime_admitted += admissions_estimate;
debuglog("Runtime Utilization: %f%%\n", runtime_admitted / runtime_worker_threads_count * 100);
} }
} }

@ -454,6 +454,8 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand
debuglog("Sandbox %lu | Uninitialized => Initialized\n", sandbox->request_arrival_timestamp); debuglog("Sandbox %lu | Uninitialized => Initialized\n", sandbox->request_arrival_timestamp);
sandbox->admissions_estimate = sandbox_request->admissions_estimate;
sandbox->request_arrival_timestamp = sandbox_request->request_arrival_timestamp; sandbox->request_arrival_timestamp = sandbox_request->request_arrival_timestamp;
sandbox->allocation_timestamp = allocation_timestamp; sandbox->allocation_timestamp = allocation_timestamp;
sandbox->last_state_change_timestamp = allocation_timestamp; sandbox->last_state_change_timestamp = allocation_timestamp;
@ -731,6 +733,11 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox_print_perf(sandbox); sandbox_print_perf(sandbox);
runtime_admitted -= sandbox->admissions_estimate;
assert(runtime_admitted >= 0);
debuglog("Runtime Utilization: %f%%\n", runtime_admitted / runtime_worker_threads_count * 100);
/* Do not touch sandbox state after adding to the completion queue to avoid use-after-free bugs */ /* Do not touch sandbox state after adding to the completion queue to avoid use-after-free bugs */
local_completion_queue_add(sandbox); local_completion_queue_add(sandbox);
} }
@ -768,6 +775,17 @@ sandbox_set_as_complete(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox->last_state_change_timestamp = now; sandbox->last_state_change_timestamp = now;
sandbox->state = SANDBOX_COMPLETE; sandbox->state = SANDBOX_COMPLETE;
/*
* TODO: Enhance to include "spinning" or better "local|global scheduling latency" as well.
* Given the async I/O model of libuv, it is ambiguous how to model "spinning"
*/
perf_window_add(&sandbox->module->perf_window, sandbox->running_duration);
runtime_admitted -= sandbox->admissions_estimate;
assert(runtime_admitted >= 0);
debuglog("Runtime Utilization: %f%%\n", runtime_admitted / runtime_worker_threads_count * 100);
sandbox_print_perf(sandbox); sandbox_print_perf(sandbox);
/* Do not touch sandbox state after adding to the completion queue to avoid use-after-free bugs */ /* Do not touch sandbox state after adding to the completion queue to avoid use-after-free bugs */

Loading…
Cancel
Save