From 34d91cfa21bebfc9efb8dfd26126ffdd707425b2 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Sun, 5 Apr 2020 21:16:17 -0400 Subject: [PATCH] refactor: wrap existing request queue --- runtime/http-parser | 1 + runtime/include/priority_queue.h | 32 ++++ runtime/include/runtime.h | 2 - runtime/include/sandbox_request.h | 75 +-------- runtime/include/sandbox_request_queue.h | 10 ++ runtime/jsmn | 1 + runtime/src/priority_queue.c | 207 ++++++++++++++++++++++++ runtime/src/runtime.c | 17 +- runtime/src/sandbox_request_queue.c | 137 ++++++++++++++++ 9 files changed, 395 insertions(+), 87 deletions(-) create mode 160000 runtime/http-parser create mode 100644 runtime/include/priority_queue.h create mode 100644 runtime/include/sandbox_request_queue.h create mode 160000 runtime/jsmn create mode 100644 runtime/src/priority_queue.c create mode 100644 runtime/src/sandbox_request_queue.c diff --git a/runtime/http-parser b/runtime/http-parser new file mode 160000 index 0000000..28f3c35 --- /dev/null +++ b/runtime/http-parser @@ -0,0 +1 @@ +Subproject commit 28f3c35c215ffbe0241685901338fad484660454 diff --git a/runtime/include/priority_queue.h b/runtime/include/priority_queue.h new file mode 100644 index 0000000..08d93c4 --- /dev/null +++ b/runtime/include/priority_queue.h @@ -0,0 +1,32 @@ +#ifndef PRIORITY_QUEUE_H +#define PRIORITY_QUEUE_H + +#include + +#define MAX 4096 + +/** + * How to get the priority out of the generic element + * We assume priority is expressed as an unsigned 64-bit integer (i.e. cycles or + * UNIX time in ms). This is used to maintain a read replica of the highest + * priority element that can be used to maintain a read replica + * @param element + * @returns priority (a u64) + **/ +typedef unsigned long long int (*priority_queue_get_priority_t)(void *element); + +// We assume that priority is expressed in terms of a 64 bit unsigned integral +struct priority_queue { + ck_spinlock_fas_t lock; + unsigned long long int highest_priority; + void * items[MAX]; + int first_free; + priority_queue_get_priority_t get_priority; +}; + +void priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_t get_priority); +int priority_queue_enqueue(struct priority_queue *self, void *value); +void *priority_queue_dequeue(struct priority_queue *self); +int priority_queue_length(struct priority_queue *self); + +#endif /* PRIORITY_QUEUE_H */ \ No newline at end of file diff --git a/runtime/include/runtime.h b/runtime/include/runtime.h index 0ef495e..bd9d5ba 100644 --- a/runtime/include/runtime.h +++ b/runtime/include/runtime.h @@ -9,8 +9,6 @@ #include "types.h" extern int runtime_epoll_file_descriptor; -extern struct deque_sandbox *runtime_global_deque; -extern pthread_mutex_t runtime_global_deque_mutex; extern __thread uv_loop_t worker_thread_uvio_handle; void alloc_linear_memory(void); diff --git a/runtime/include/sandbox_request.h b/runtime/include/sandbox_request.h index 4816a1c..ffae5a2 100644 --- a/runtime/include/sandbox_request.h +++ b/runtime/include/sandbox_request.h @@ -16,28 +16,6 @@ typedef struct sandbox_request sandbox_request_t; DEQUE_PROTOTYPE(sandbox, sandbox_request_t *); -/** - * Pushes a sandbox request to the global deque - * @param sandbox_request - **/ -static inline int -sandbox_request_push_to_dequeue(sandbox_request_t *sandbox_request) -{ - int return_code; - -// TODO: Running the runtime and listener cores on a single shared core is untested -// We are unsure if the locking behavior is correct, so there may be deadlocks -#if NCORES == 1 - pthread_mutex_lock(&runtime_global_deque_mutex); -#endif - return_code = deque_push_sandbox(runtime_global_deque, &sandbox_request); -#if NCORES == 1 - pthread_mutex_unlock(&runtime_global_deque_mutex); -#endif - - return return_code; -} - /** * Allocates a new Sandbox Request and places it on the Global Deque * @param module the module we want to request @@ -49,7 +27,7 @@ sandbox_request_push_to_dequeue(sandbox_request_t *sandbox_request) **/ static inline sandbox_request_t * sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor, - const struct sockaddr *socket_address, u64 start_time) + const struct sockaddr *socket_address, u64 start_time) { sandbox_request_t *sandbox_request = (sandbox_request_t *)malloc(sizeof(sandbox_request_t)); assert(sandbox_request); @@ -60,57 +38,6 @@ sandbox_request_allocate(struct module *module, char *arguments, int socket_desc sandbox_request->start_time = start_time; debuglog("[%p: %s]\n", sandbox_request, sandbox_request->module->name); - sandbox_request_push_to_dequeue(sandbox_request); - return sandbox_request; -} - -/** - * Pops a sandbox request from the global deque - * @param sandbox_request the pointer which we want to set to the sandbox request - **/ -static inline int -sandbox_request_pop_from_dequeue(sandbox_request_t **sandbox_request) -{ - int return_code; - -// TODO: Running the runtime and listener cores on a single shared core is untested -// We are unsure if the locking behavior is correct, so there may be deadlocks -#if NCORES == 1 - pthread_mutex_lock(&runtime_global_deque_mutex); -#endif - return_code = deque_pop_sandbox(runtime_global_deque, sandbox_request); -#if NCORES == 1 - pthread_mutex_unlock(&runtime_global_deque_mutex); -#endif - return return_code; -} - -/** - * Stealing from the dequeue is a lock-free, cross-core "pop", which removes the element from the end opposite to - * "pop". Because the producer and consumer (the core stealine the sandbox request) modify different ends, - * no locks are required, and coordination is achieved by instead retrying on inconsistent indices. - * - * Relevant Read: https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf - * - * TODO: Notice the mutex_lock for NCORES == 1 in both push/pop functions and steal calling 'pop' for NCORES == 1. - * Ideally you don't call steal for same core consumption but I just made the steal API wrap that logic. Which is - * perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core - * and add an assert in "steal" function for NCORES == 1. - * - * @returns A Sandbox Request or NULL - **/ -static inline sandbox_request_t * -sandbox_request_steal_from_dequeue(void) -{ - sandbox_request_t *sandbox_request = NULL; - -#if NCORES == 1 - sandbox_request_pop_from_dequeue(&sandbox_request); -#else - int r = deque_steal_sandbox(runtime_global_deque, &sandbox_request); - if (r) sandbox_request = NULL; -#endif - return sandbox_request; } diff --git a/runtime/include/sandbox_request_queue.h b/runtime/include/sandbox_request_queue.h new file mode 100644 index 0000000..d1ebb43 --- /dev/null +++ b/runtime/include/sandbox_request_queue.h @@ -0,0 +1,10 @@ +#ifndef SFRT_SANDBOX_REQUEST_QUEUE_H +#define SFRT_SANDBOX_REQUEST_QUEUE_H + +#include + +void sandbox_request_queue_initialize(void); +int sandbox_request_queue_add(sandbox_request_t *); +sandbox_request_t *sandbox_request_queue_remove(void); + +#endif /* SFRT_SANDBOX_REQUEST_QUEUE_H */ \ No newline at end of file diff --git a/runtime/jsmn b/runtime/jsmn new file mode 160000 index 0000000..85695f3 --- /dev/null +++ b/runtime/jsmn @@ -0,0 +1 @@ +Subproject commit 85695f3d5903b1cd5b4030efe50db3b4f5f3c928 diff --git a/runtime/src/priority_queue.c b/runtime/src/priority_queue.c new file mode 100644 index 0000000..6f2949a --- /dev/null +++ b/runtime/src/priority_queue.c @@ -0,0 +1,207 @@ +#include +#include +#include +#include + +#include + +/**************************** + * Private Helper Functions * + ****************************/ + +/** + * Adds a value to the end of the binary heap + * @param self the priority queue + * @param new_item the value we are adding + * @return 0 on success. -1 when priority queue is full + **/ +static inline int +priority_queue_append(struct priority_queue *self, void *new_item) +{ + assert(self != NULL); + + if (self->first_free >= MAX) return -1; + + self->items[self->first_free] = new_item; + self->first_free++; + return 0; +} + +/** + * Shifts an appended value upwards to restore heap structure property + * @param self the priority queue + */ +static inline void +priority_queue_percolate_up(struct priority_queue *self) +{ + assert(self != NULL); + assert(self->get_priority != NULL); + + for (int i = self->first_free - 1; + i / 2 != 0 && self->get_priority(self->items[i]) < self->get_priority(self->items[i / 2]); i /= 2) { + void *temp = self->items[i / 2]; + self->items[i / 2] = self->items[i]; + self->items[i] = temp; + // If percolated to highest priority, update highest priority + if (i / 2 == 1) self->highest_priority = self->get_priority(self->items[1]); + } +} + +/** + * Returns the index of a node's smallest child + * @param self the priority queue + * @param parent_index + * @returns the index of the smallest child + */ +static inline int +priority_queue_find_smallest_child(struct priority_queue *self, int parent_index) +{ + assert(self != NULL); + assert(parent_index >= 1 && parent_index < self->first_free); + assert(self->get_priority != NULL); + + int left_child_index = 2 * parent_index; + int right_child_index = 2 * parent_index + 1; + assert(self->items[left_child_index] != NULL); + + // If we don't have a right child or the left child is smaller, return it + if (right_child_index == self->first_free) { + return left_child_index; + } else if (self->get_priority(self->items[left_child_index]) + < self->get_priority(self->items[right_child_index])) { + return left_child_index; + } else { + // Otherwise, return the right child + return right_child_index; + } +} + +/** + * Shifts the top of the heap downwards. Used after placing the last value at + * the top + * @param self the priority queue + */ +static inline void +priority_queue_percolate_down(struct priority_queue *self) +{ + assert(self != NULL); + assert(self->get_priority != NULL); + + int parent_index = 1; + int left_child_index = 2 * parent_index; + while (left_child_index >= 2 && left_child_index < self->first_free) { + int smallest_child_index = priority_queue_find_smallest_child(self, parent_index); + // Once the parent is equal to or less than its smallest child, break; + if (self->get_priority(self->items[parent_index]) + <= self->get_priority(self->items[smallest_child_index])) + break; + // Otherwise, swap and continue down the tree + void *temp = self->items[smallest_child_index]; + self->items[smallest_child_index] = self->items[parent_index]; + self->items[parent_index] = temp; + + parent_index = smallest_child_index; + left_child_index = 2 * parent_index; + } +} + +/********************* + * Public API * + *********************/ + +/** + * Initialized the Priority Queue Data structure + * @param self the priority_queue to initialize + * @param get_priority pointer to a function that returns the priority of an + *element + **/ +void +priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_t get_priority) +{ + assert(self != NULL); + assert(get_priority != NULL); + + memset(self->items, 0, sizeof(void *) * MAX); + + ck_spinlock_fas_init(&self->lock); + self->first_free = 1; + self->get_priority = get_priority; + + // We're assuming a min-heap implementation, so set to larget possible value + self->highest_priority = ULONG_MAX; +} + +/** + * @param self the priority_queue + * @returns the number of elements in the priority queue + **/ +int +priority_queue_length(struct priority_queue *self) +{ + assert(self != NULL); + + return self->first_free - 1; +} + +/** + * @param self - the priority queue we want to add to + * @param value - the value we want to add + * @returns 0 on success. -1 on full. -2 on unable to take lock + **/ +int +priority_queue_enqueue(struct priority_queue *self, void *value) +{ + assert(self != NULL); + int rc; + + if (ck_spinlock_fas_trylock(&self->lock) == false) return -2; + + // Start of Critical Section + if (priority_queue_append(self, value) == 0) { + if (self->first_free > 2) { + priority_queue_percolate_up(self); + } else { + // If this is the first element we add, update the highest priority + self->highest_priority = self->get_priority(value); + } + rc = 0; + } else { + // Priority Queue is full + rc = -1; + } + // End of Critical Section + ck_spinlock_fas_unlock(&self->lock); + return rc; +} + +/** + * @param self - the priority queue we want to add to + * @returns The head of the priority queue or NULL when empty + **/ +void * +priority_queue_dequeue(struct priority_queue *self) +{ + assert(self != NULL); + assert(self->get_priority != NULL); + // If first_free is 1, we're empty + if (self->first_free == 1) return NULL; + + if (ck_spinlock_fas_trylock(&self->lock) == false) return NULL; + // Start of Critical Section + void *min = self->items[1]; + self->items[1] = self->items[self->first_free - 1]; + self->items[self->first_free - 1] = NULL; + self->first_free--; + assert(self->first_free == 1 || self->items[self->first_free - 1] != NULL); + // Because of 1-based indices, first_free is 2 when there is only one element + if (self->first_free > 2) priority_queue_percolate_down(self); + + if (self->first_free > 1) { + self->highest_priority = self->get_priority(self->items[1]); + } else { + self->highest_priority = ULONG_MAX; + } + ck_spinlock_fas_unlock(&self->lock); + // End of Critical Section + return min; +} diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index 9b10d5d..8d22c69 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -27,10 +28,8 @@ * Shared Process State * **************************/ -struct deque_sandbox *runtime_global_deque; -pthread_mutex_t runtime_global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; -int runtime_epoll_file_descriptor; -http_parser_settings runtime_http_parser_settings; +int runtime_epoll_file_descriptor; +http_parser_settings runtime_http_parser_settings; /****************************************** * Shared Process / Listener Thread Logic * @@ -46,10 +45,7 @@ runtime_initialize(void) assert(runtime_epoll_file_descriptor >= 0); // Allocate and Initialize the global deque - runtime_global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); - assert(runtime_global_deque); - // Note: Below is a Macro - deque_init_sandbox(runtime_global_deque, RUNTIME_MAX_SANDBOX_REQUEST_COUNT); + sandbox_request_queue_initialize(); // Mask Signals software_interrupt_mask_signal(SIGUSR1); @@ -106,8 +102,7 @@ listener_thread_main(void *dummy) sandbox_request_allocate(module, module->name, socket_descriptor, (const struct sockaddr *)&client_address, start_time); assert(sandbox_request); - - // TODO: Refactor sandbox_request_allocate to not add to global request queue and do this here + sandbox_request_queue_add(sandbox_request); } } @@ -273,7 +268,7 @@ worker_thread_pull_and_process_sandbox_requests(void) while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) { sandbox_request_t *sandbox_request; - if ((sandbox_request = sandbox_request_steal_from_dequeue()) == NULL) break; + if ((sandbox_request = sandbox_request_queue_remove()) == NULL) break; // Actually allocate the sandbox for the requests that we've pulled struct sandbox *sandbox = sandbox_allocate(sandbox_request->module, sandbox_request->arguments, sandbox_request->socket_descriptor, diff --git a/runtime/src/sandbox_request_queue.c b/runtime/src/sandbox_request_queue.c new file mode 100644 index 0000000..89b3fae --- /dev/null +++ b/runtime/src/sandbox_request_queue.c @@ -0,0 +1,137 @@ +#include +#include + +enum scheduling_policy +{ + FIFO, + PS +}; + +enum scheduling_policy sandbox_request_queue_policy = FIFO; + +// FIFO Globals +struct deque_sandbox *runtime_global_deque; +pthread_mutex_t runtime_global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; + +// PS Globals +struct priority_queue sandbox_request_queue_ps; + +static inline void +sandbox_request_queue_fifo_initialize(void) +{ + // Allocate and Initialize the global deque + runtime_global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); + assert(runtime_global_deque); + // Note: Below is a Macro + deque_init_sandbox(runtime_global_deque, RUNTIME_MAX_SANDBOX_REQUEST_COUNT); +} + +static inline void +sandbox_request_queue_ps_initialize(void) +{ + // TODO +} + +void +sandbox_request_queue_initialize(void) +{ + switch (sandbox_request_queue_policy) { + case FIFO: + return sandbox_request_queue_fifo_initialize(); + case PS: + return sandbox_request_queue_ps_initialize(); + } +} + +/** + * Pushes a sandbox request to the global deque + * @param sandbox_request + **/ +static inline int +sandbox_request_queue_fifo_add(sandbox_request_t *sandbox_request) +{ + int return_code; + +// TODO: Running the runtime and listener cores on a single shared core is untested +// We are unsure if the locking behavior is correct, so there may be deadlocks +#if NCORES == 1 + pthread_mutex_lock(&runtime_global_deque_mutex); +#endif + return_code = deque_push_sandbox(runtime_global_deque, &sandbox_request); +#if NCORES == 1 + pthread_mutex_unlock(&runtime_global_deque_mutex); +#endif + + return return_code; +} + +static inline int +sandbox_request_queue_ps_add(sandbox_request_t *sandbox_request) +{ + // TODO + return 0; +} + +/** + * Pushes a sandbox request to the global deque + * @param sandbox_request + **/ +int +sandbox_request_queue_add(sandbox_request_t *sandbox_request) +{ + switch (sandbox_request_queue_policy) { + case FIFO: + return sandbox_request_queue_fifo_add(sandbox_request); + case PS: + return sandbox_request_queue_ps_add(sandbox_request); + } +} + +/** + * Stealing from the dequeue is a lock-free, cross-core "pop", which removes the element from the end opposite to + * "pop". Because the producer and consumer (the core stealine the sandbox request) modify different ends, + * no locks are required, and coordination is achieved by instead retrying on inconsistent indices. + * + * Relevant Read: https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf + * + * TODO: Notice the mutex_lock for NCORES == 1 in both push/pop functions and steal calling 'pop' for NCORES == 1. + * Ideally you don't call steal for same core consumption but I just made the steal API wrap that logic. Which is + * perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core + * and add an assert in "steal" function for NCORES == 1. + * + * @returns A Sandbox Request or NULL + **/ +static inline sandbox_request_t * +sandbox_request_queue_fifo_remove(void) +{ + sandbox_request_t *sandbox_request = NULL; + +#if NCORES == 1 + pthread_mutex_lock(&runtime_global_deque_mutex); + return_code = deque_pop_sandbox(runtime_global_deque, sandbox_request); + pthread_mutex_unlock(&runtime_global_deque_mutex); +#else + int return_code = deque_steal_sandbox(runtime_global_deque, &sandbox_request); +#endif + if (return_code) sandbox_request = NULL; + return sandbox_request; +} + +static inline sandbox_request_t * +sandbox_request_queue_ps_remove(void) +{ + sandbox_request_t *sandbox_request = NULL; + // TODO + return sandbox_request; +} + +sandbox_request_t * +sandbox_request_queue_remove(void) +{ + switch (sandbox_request_queue_policy) { + case FIFO: + return sandbox_request_queue_fifo_remove(); + case PS: + return sandbox_request_queue_ps_remove(); + } +} \ No newline at end of file