feat: Use integers for admissions control

main
Sean McBride 4 years ago
parent c3dbe76173
commit 10ad100847

@ -2,20 +2,18 @@
#include <pthread.h> #include <pthread.h>
#include <sys/epoll.h> /* for epoll_create1(), epoll_ctl(), struct epoll_event */ #include <sys/epoll.h> /* for epoll_create1(), epoll_ctl(), struct epoll_event */
#include <stdatomic.h>
#include <stdbool.h> #include <stdbool.h>
#include "types.h" #include "types.h"
#if defined(LOG_TOTAL_REQS_RESPS) || defined(LOG_SANDBOX_TOTALS)
#include <stdatomic.h>
#endif
#define LISTENER_THREAD_CORE_ID 0 /* Dedicated Listener Core */ #define LISTENER_THREAD_CORE_ID 0 /* Dedicated Listener Core */
#define LISTENER_THREAD_MAX_EPOLL_EVENTS 128 #define LISTENER_THREAD_MAX_EPOLL_EVENTS 128
#define RUNTIME_LOG_FILE "awesome.log" #define RUNTIME_LOG_FILE "awesome.log"
#define RUNTIME_MAX_SANDBOX_REQUEST_COUNT (1 << 19) /* random! */ #define RUNTIME_MAX_SANDBOX_REQUEST_COUNT (1 << 19) /* random! */
#define RUNTIME_READ_WRITE_VECTOR_LENGTH 16 #define RUNTIME_READ_WRITE_VECTOR_LENGTH 16
#define RUNTIME_GRANULARITY 100000
/* /*
* Descriptor of the epoll instance used to monitor the socket descriptors of registered * Descriptor of the epoll instance used to monitor the socket descriptors of registered
@ -32,6 +30,7 @@ extern float runtime_processor_speed_MHz;
/* Count of worker threads and array of their pthread identifiers */ /* Count of worker threads and array of their pthread identifiers */
extern pthread_t runtime_worker_threads[]; extern pthread_t runtime_worker_threads[];
extern uint32_t runtime_worker_threads_count; extern uint32_t runtime_worker_threads_count;
extern uint64_t runtime_admissions_capacity;
#ifdef LOG_TOTAL_REQS_RESPS #ifdef LOG_TOTAL_REQS_RESPS
/* Counts to track requests and responses */ /* Counts to track requests and responses */
@ -64,7 +63,7 @@ extern _Atomic uint32_t runtime_total_complete_sandboxes;
* These estimates are incremented on request acceptance and decremented on request completion (either * These estimates are incremented on request acceptance and decremented on request completion (either
* success or failure) * success or failure)
*/ */
extern double runtime_admitted; extern _Atomic uint64_t runtime_admitted;
void alloc_linear_memory(void); void alloc_linear_memory(void);
void expand_memory(void); void expand_memory(void);

@ -83,9 +83,9 @@ struct sandbox {
/* /*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request * Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) / relative deadline (cycles) * Calculated by estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles)
*/ */
double admissions_estimate; uint64_t admissions_estimate;
struct module *module; /* the module this is an instance of */ struct module *module; /* the module this is an instance of */

@ -17,9 +17,9 @@ struct sandbox_request {
/* /*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request * Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) / relative deadline (cycles) * Calculated by estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles)
*/ */
double admissions_estimate; uint64_t admissions_estimate;
}; };
DEQUE_PROTOTYPE(sandbox, struct sandbox_request *); DEQUE_PROTOTYPE(sandbox, struct sandbox_request *);
@ -36,7 +36,7 @@ DEQUE_PROTOTYPE(sandbox, struct sandbox_request *);
static inline struct sandbox_request * static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor, sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor,
const struct sockaddr *socket_address, uint64_t request_arrival_timestamp, const struct sockaddr *socket_address, uint64_t request_arrival_timestamp,
double admissions_estimate) uint64_t admissions_estimate)
{ {
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request)); struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
assert(sandbox_request); assert(sandbox_request);

@ -19,7 +19,8 @@
**************************/ **************************/
int runtime_epoll_file_descriptor; int runtime_epoll_file_descriptor;
double runtime_admitted; _Atomic uint64_t runtime_admitted;
uint64_t runtime_admissions_capacity;
#ifdef LOG_TOTAL_REQS_RESPS #ifdef LOG_TOTAL_REQS_RESPS
_Atomic uint32_t runtime_total_requests = 0; _Atomic uint32_t runtime_total_requests = 0;
@ -93,6 +94,8 @@ runtime_initialize(void)
/* Initialize http_parser_settings global */ /* Initialize http_parser_settings global */
http_parser_settings_initialize(); http_parser_settings_initialize();
/* Initialize admissions control state */
runtime_admissions_capacity = runtime_worker_threads_count * RUNTIME_GRANULARITY;
runtime_admitted = 0; runtime_admitted = 0;
} }
@ -234,9 +237,10 @@ listener_thread_main(void *dummy)
*/ */
if (estimated_execution == -1) estimated_execution = 1000; if (estimated_execution == -1) estimated_execution = 1000;
double admissions_estimate = (double)estimated_execution / module->relative_deadline; uint64_t admissions_estimate = (((uint64_t)estimated_execution) * RUNTIME_GRANULARITY)
/ module->relative_deadline;
if (runtime_admitted + admissions_estimate >= runtime_worker_threads_count) { if (runtime_admitted + admissions_estimate >= runtime_admissions_capacity) {
listener_thread_reject(client_socket); listener_thread_reject(client_socket);
continue; continue;
} }
@ -254,7 +258,8 @@ listener_thread_main(void *dummy)
runtime_admitted += admissions_estimate; runtime_admitted += admissions_estimate;
#ifdef LOG_ADMISSIONS_CONTROL #ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %f / %u\n", runtime_admitted, runtime_worker_threads_count); debuglog("Runtime Admitted: %lu / %lu\n", runtime_admitted,
runtime_admissions_capacity);
#endif #endif
} /* while true */ } /* while true */
} /* for loop */ } /* for loop */

@ -838,10 +838,13 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox_print_perf(sandbox); sandbox_print_perf(sandbox);
#endif #endif
/* Assumption: Should never underflow */
assert(runtime_admitted >= sandbox->admissions_estimate);
runtime_admitted -= sandbox->admissions_estimate; runtime_admitted -= sandbox->admissions_estimate;
#ifdef LOG_ADMISSIONS_CONTROL #ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %f / %u\n", runtime_admitted, runtime_worker_threads_count); debuglog("Runtime Admitted: %lu / %lu\n", runtime_admitted, runtime_admissions_capacity);
#endif #endif
/* Do not touch sandbox state after adding to the completion queue to avoid use-after-free bugs */ /* Do not touch sandbox state after adding to the completion queue to avoid use-after-free bugs */
@ -893,10 +896,13 @@ sandbox_set_as_complete(struct sandbox *sandbox, sandbox_state_t last_state)
*/ */
perf_window_add(&sandbox->module->perf_window, sandbox->running_duration); perf_window_add(&sandbox->module->perf_window, sandbox->running_duration);
/* Assumption: Should never underflow */
assert(runtime_admitted >= sandbox->admissions_estimate);
runtime_admitted -= sandbox->admissions_estimate; runtime_admitted -= sandbox->admissions_estimate;
#ifdef LOG_ADMISSIONS_CONTROL #ifdef LOG_ADMISSIONS_CONTROL
debuglog("Runtime Admitted: %f / %u\n", runtime_admitted, runtime_worker_threads_count); debuglog("Runtime Admitted: %lu / %lu\n", runtime_admitted, runtime_admissions_capacity);
#endif #endif
#ifdef LOG_SANDBOX_PERF #ifdef LOG_SANDBOX_PERF

Loading…
Cancel
Save