refactor: wrap existing request queue

main
Sean McBride 5 years ago
parent d14af73c30
commit 34d91cfa21

@ -0,0 +1 @@
Subproject commit 28f3c35c215ffbe0241685901338fad484660454

@ -0,0 +1,32 @@
#ifndef PRIORITY_QUEUE_H
#define PRIORITY_QUEUE_H
#include <spinlock/fas.h>
#define MAX 4096
/**
* How to get the priority out of the generic element
* We assume priority is expressed as an unsigned 64-bit integer (i.e. cycles or
* UNIX time in ms). This is used to maintain a read replica of the highest
* priority element that can be used to maintain a read replica
* @param element
* @returns priority (a u64)
**/
typedef unsigned long long int (*priority_queue_get_priority_t)(void *element);
// We assume that priority is expressed in terms of a 64 bit unsigned integral
struct priority_queue {
ck_spinlock_fas_t lock;
unsigned long long int highest_priority;
void * items[MAX];
int first_free;
priority_queue_get_priority_t get_priority;
};
void priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_t get_priority);
int priority_queue_enqueue(struct priority_queue *self, void *value);
void *priority_queue_dequeue(struct priority_queue *self);
int priority_queue_length(struct priority_queue *self);
#endif /* PRIORITY_QUEUE_H */

@ -9,8 +9,6 @@
#include "types.h" #include "types.h"
extern int runtime_epoll_file_descriptor; extern int runtime_epoll_file_descriptor;
extern struct deque_sandbox *runtime_global_deque;
extern pthread_mutex_t runtime_global_deque_mutex;
extern __thread uv_loop_t worker_thread_uvio_handle; extern __thread uv_loop_t worker_thread_uvio_handle;
void alloc_linear_memory(void); void alloc_linear_memory(void);

@ -16,28 +16,6 @@ typedef struct sandbox_request sandbox_request_t;
DEQUE_PROTOTYPE(sandbox, sandbox_request_t *); DEQUE_PROTOTYPE(sandbox, sandbox_request_t *);
/**
* Pushes a sandbox request to the global deque
* @param sandbox_request
**/
static inline int
sandbox_request_push_to_dequeue(sandbox_request_t *sandbox_request)
{
int return_code;
// TODO: Running the runtime and listener cores on a single shared core is untested
// We are unsure if the locking behavior is correct, so there may be deadlocks
#if NCORES == 1
pthread_mutex_lock(&runtime_global_deque_mutex);
#endif
return_code = deque_push_sandbox(runtime_global_deque, &sandbox_request);
#if NCORES == 1
pthread_mutex_unlock(&runtime_global_deque_mutex);
#endif
return return_code;
}
/** /**
* Allocates a new Sandbox Request and places it on the Global Deque * Allocates a new Sandbox Request and places it on the Global Deque
* @param module the module we want to request * @param module the module we want to request
@ -49,7 +27,7 @@ sandbox_request_push_to_dequeue(sandbox_request_t *sandbox_request)
**/ **/
static inline sandbox_request_t * static inline sandbox_request_t *
sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor, sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor,
const struct sockaddr *socket_address, u64 start_time) const struct sockaddr *socket_address, u64 start_time)
{ {
sandbox_request_t *sandbox_request = (sandbox_request_t *)malloc(sizeof(sandbox_request_t)); sandbox_request_t *sandbox_request = (sandbox_request_t *)malloc(sizeof(sandbox_request_t));
assert(sandbox_request); assert(sandbox_request);
@ -60,57 +38,6 @@ sandbox_request_allocate(struct module *module, char *arguments, int socket_desc
sandbox_request->start_time = start_time; sandbox_request->start_time = start_time;
debuglog("[%p: %s]\n", sandbox_request, sandbox_request->module->name); debuglog("[%p: %s]\n", sandbox_request, sandbox_request->module->name);
sandbox_request_push_to_dequeue(sandbox_request);
return sandbox_request;
}
/**
* Pops a sandbox request from the global deque
* @param sandbox_request the pointer which we want to set to the sandbox request
**/
static inline int
sandbox_request_pop_from_dequeue(sandbox_request_t **sandbox_request)
{
int return_code;
// TODO: Running the runtime and listener cores on a single shared core is untested
// We are unsure if the locking behavior is correct, so there may be deadlocks
#if NCORES == 1
pthread_mutex_lock(&runtime_global_deque_mutex);
#endif
return_code = deque_pop_sandbox(runtime_global_deque, sandbox_request);
#if NCORES == 1
pthread_mutex_unlock(&runtime_global_deque_mutex);
#endif
return return_code;
}
/**
* Stealing from the dequeue is a lock-free, cross-core "pop", which removes the element from the end opposite to
* "pop". Because the producer and consumer (the core stealine the sandbox request) modify different ends,
* no locks are required, and coordination is achieved by instead retrying on inconsistent indices.
*
* Relevant Read: https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf
*
* TODO: Notice the mutex_lock for NCORES == 1 in both push/pop functions and steal calling 'pop' for NCORES == 1.
* Ideally you don't call steal for same core consumption but I just made the steal API wrap that logic. Which is
* perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core
* and add an assert in "steal" function for NCORES == 1.
*
* @returns A Sandbox Request or NULL
**/
static inline sandbox_request_t *
sandbox_request_steal_from_dequeue(void)
{
sandbox_request_t *sandbox_request = NULL;
#if NCORES == 1
sandbox_request_pop_from_dequeue(&sandbox_request);
#else
int r = deque_steal_sandbox(runtime_global_deque, &sandbox_request);
if (r) sandbox_request = NULL;
#endif
return sandbox_request; return sandbox_request;
} }

@ -0,0 +1,10 @@
#ifndef SFRT_SANDBOX_REQUEST_QUEUE_H
#define SFRT_SANDBOX_REQUEST_QUEUE_H
#include <sandbox_request.h>
void sandbox_request_queue_initialize(void);
int sandbox_request_queue_add(sandbox_request_t *);
sandbox_request_t *sandbox_request_queue_remove(void);
#endif /* SFRT_SANDBOX_REQUEST_QUEUE_H */

@ -0,0 +1 @@
Subproject commit 85695f3d5903b1cd5b4030efe50db3b4f5f3c928

@ -0,0 +1,207 @@
#include <assert.h>
#include <limits.h>
#include <stdio.h>
#include <string.h>
#include <priority_queue.h>
/****************************
* Private Helper Functions *
****************************/
/**
* Adds a value to the end of the binary heap
* @param self the priority queue
* @param new_item the value we are adding
* @return 0 on success. -1 when priority queue is full
**/
static inline int
priority_queue_append(struct priority_queue *self, void *new_item)
{
assert(self != NULL);
if (self->first_free >= MAX) return -1;
self->items[self->first_free] = new_item;
self->first_free++;
return 0;
}
/**
* Shifts an appended value upwards to restore heap structure property
* @param self the priority queue
*/
static inline void
priority_queue_percolate_up(struct priority_queue *self)
{
assert(self != NULL);
assert(self->get_priority != NULL);
for (int i = self->first_free - 1;
i / 2 != 0 && self->get_priority(self->items[i]) < self->get_priority(self->items[i / 2]); i /= 2) {
void *temp = self->items[i / 2];
self->items[i / 2] = self->items[i];
self->items[i] = temp;
// If percolated to highest priority, update highest priority
if (i / 2 == 1) self->highest_priority = self->get_priority(self->items[1]);
}
}
/**
* Returns the index of a node's smallest child
* @param self the priority queue
* @param parent_index
* @returns the index of the smallest child
*/
static inline int
priority_queue_find_smallest_child(struct priority_queue *self, int parent_index)
{
assert(self != NULL);
assert(parent_index >= 1 && parent_index < self->first_free);
assert(self->get_priority != NULL);
int left_child_index = 2 * parent_index;
int right_child_index = 2 * parent_index + 1;
assert(self->items[left_child_index] != NULL);
// If we don't have a right child or the left child is smaller, return it
if (right_child_index == self->first_free) {
return left_child_index;
} else if (self->get_priority(self->items[left_child_index])
< self->get_priority(self->items[right_child_index])) {
return left_child_index;
} else {
// Otherwise, return the right child
return right_child_index;
}
}
/**
* Shifts the top of the heap downwards. Used after placing the last value at
* the top
* @param self the priority queue
*/
static inline void
priority_queue_percolate_down(struct priority_queue *self)
{
assert(self != NULL);
assert(self->get_priority != NULL);
int parent_index = 1;
int left_child_index = 2 * parent_index;
while (left_child_index >= 2 && left_child_index < self->first_free) {
int smallest_child_index = priority_queue_find_smallest_child(self, parent_index);
// Once the parent is equal to or less than its smallest child, break;
if (self->get_priority(self->items[parent_index])
<= self->get_priority(self->items[smallest_child_index]))
break;
// Otherwise, swap and continue down the tree
void *temp = self->items[smallest_child_index];
self->items[smallest_child_index] = self->items[parent_index];
self->items[parent_index] = temp;
parent_index = smallest_child_index;
left_child_index = 2 * parent_index;
}
}
/*********************
* Public API *
*********************/
/**
* Initialized the Priority Queue Data structure
* @param self the priority_queue to initialize
* @param get_priority pointer to a function that returns the priority of an
*element
**/
void
priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_t get_priority)
{
assert(self != NULL);
assert(get_priority != NULL);
memset(self->items, 0, sizeof(void *) * MAX);
ck_spinlock_fas_init(&self->lock);
self->first_free = 1;
self->get_priority = get_priority;
// We're assuming a min-heap implementation, so set to larget possible value
self->highest_priority = ULONG_MAX;
}
/**
* @param self the priority_queue
* @returns the number of elements in the priority queue
**/
int
priority_queue_length(struct priority_queue *self)
{
assert(self != NULL);
return self->first_free - 1;
}
/**
* @param self - the priority queue we want to add to
* @param value - the value we want to add
* @returns 0 on success. -1 on full. -2 on unable to take lock
**/
int
priority_queue_enqueue(struct priority_queue *self, void *value)
{
assert(self != NULL);
int rc;
if (ck_spinlock_fas_trylock(&self->lock) == false) return -2;
// Start of Critical Section
if (priority_queue_append(self, value) == 0) {
if (self->first_free > 2) {
priority_queue_percolate_up(self);
} else {
// If this is the first element we add, update the highest priority
self->highest_priority = self->get_priority(value);
}
rc = 0;
} else {
// Priority Queue is full
rc = -1;
}
// End of Critical Section
ck_spinlock_fas_unlock(&self->lock);
return rc;
}
/**
* @param self - the priority queue we want to add to
* @returns The head of the priority queue or NULL when empty
**/
void *
priority_queue_dequeue(struct priority_queue *self)
{
assert(self != NULL);
assert(self->get_priority != NULL);
// If first_free is 1, we're empty
if (self->first_free == 1) return NULL;
if (ck_spinlock_fas_trylock(&self->lock) == false) return NULL;
// Start of Critical Section
void *min = self->items[1];
self->items[1] = self->items[self->first_free - 1];
self->items[self->first_free - 1] = NULL;
self->first_free--;
assert(self->first_free == 1 || self->items[self->first_free - 1] != NULL);
// Because of 1-based indices, first_free is 2 when there is only one element
if (self->first_free > 2) priority_queue_percolate_down(self);
if (self->first_free > 1) {
self->highest_priority = self->get_priority(self->items[1]);
} else {
self->highest_priority = ULONG_MAX;
}
ck_spinlock_fas_unlock(&self->lock);
// End of Critical Section
return min;
}

@ -20,6 +20,7 @@
#include <module.h> #include <module.h>
#include <sandbox.h> #include <sandbox.h>
#include <sandbox_request.h> #include <sandbox_request.h>
#include <sandbox_request_queue.h>
#include <software_interrupt.h> #include <software_interrupt.h>
#include <types.h> #include <types.h>
@ -27,10 +28,8 @@
* Shared Process State * * Shared Process State *
**************************/ **************************/
struct deque_sandbox *runtime_global_deque; int runtime_epoll_file_descriptor;
pthread_mutex_t runtime_global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; http_parser_settings runtime_http_parser_settings;
int runtime_epoll_file_descriptor;
http_parser_settings runtime_http_parser_settings;
/****************************************** /******************************************
* Shared Process / Listener Thread Logic * * Shared Process / Listener Thread Logic *
@ -46,10 +45,7 @@ runtime_initialize(void)
assert(runtime_epoll_file_descriptor >= 0); assert(runtime_epoll_file_descriptor >= 0);
// Allocate and Initialize the global deque // Allocate and Initialize the global deque
runtime_global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); sandbox_request_queue_initialize();
assert(runtime_global_deque);
// Note: Below is a Macro
deque_init_sandbox(runtime_global_deque, RUNTIME_MAX_SANDBOX_REQUEST_COUNT);
// Mask Signals // Mask Signals
software_interrupt_mask_signal(SIGUSR1); software_interrupt_mask_signal(SIGUSR1);
@ -106,8 +102,7 @@ listener_thread_main(void *dummy)
sandbox_request_allocate(module, module->name, socket_descriptor, sandbox_request_allocate(module, module->name, socket_descriptor,
(const struct sockaddr *)&client_address, start_time); (const struct sockaddr *)&client_address, start_time);
assert(sandbox_request); assert(sandbox_request);
sandbox_request_queue_add(sandbox_request);
// TODO: Refactor sandbox_request_allocate to not add to global request queue and do this here
} }
} }
@ -273,7 +268,7 @@ worker_thread_pull_and_process_sandbox_requests(void)
while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) { while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) {
sandbox_request_t *sandbox_request; sandbox_request_t *sandbox_request;
if ((sandbox_request = sandbox_request_steal_from_dequeue()) == NULL) break; if ((sandbox_request = sandbox_request_queue_remove()) == NULL) break;
// Actually allocate the sandbox for the requests that we've pulled // Actually allocate the sandbox for the requests that we've pulled
struct sandbox *sandbox = sandbox_allocate(sandbox_request->module, sandbox_request->arguments, struct sandbox *sandbox = sandbox_allocate(sandbox_request->module, sandbox_request->arguments,
sandbox_request->socket_descriptor, sandbox_request->socket_descriptor,

@ -0,0 +1,137 @@
#include <sandbox_request_queue.h>
#include <priority_queue.h>
enum scheduling_policy
{
FIFO,
PS
};
enum scheduling_policy sandbox_request_queue_policy = FIFO;
// FIFO Globals
struct deque_sandbox *runtime_global_deque;
pthread_mutex_t runtime_global_deque_mutex = PTHREAD_MUTEX_INITIALIZER;
// PS Globals
struct priority_queue sandbox_request_queue_ps;
static inline void
sandbox_request_queue_fifo_initialize(void)
{
// Allocate and Initialize the global deque
runtime_global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox));
assert(runtime_global_deque);
// Note: Below is a Macro
deque_init_sandbox(runtime_global_deque, RUNTIME_MAX_SANDBOX_REQUEST_COUNT);
}
static inline void
sandbox_request_queue_ps_initialize(void)
{
// TODO
}
void
sandbox_request_queue_initialize(void)
{
switch (sandbox_request_queue_policy) {
case FIFO:
return sandbox_request_queue_fifo_initialize();
case PS:
return sandbox_request_queue_ps_initialize();
}
}
/**
* Pushes a sandbox request to the global deque
* @param sandbox_request
**/
static inline int
sandbox_request_queue_fifo_add(sandbox_request_t *sandbox_request)
{
int return_code;
// TODO: Running the runtime and listener cores on a single shared core is untested
// We are unsure if the locking behavior is correct, so there may be deadlocks
#if NCORES == 1
pthread_mutex_lock(&runtime_global_deque_mutex);
#endif
return_code = deque_push_sandbox(runtime_global_deque, &sandbox_request);
#if NCORES == 1
pthread_mutex_unlock(&runtime_global_deque_mutex);
#endif
return return_code;
}
static inline int
sandbox_request_queue_ps_add(sandbox_request_t *sandbox_request)
{
// TODO
return 0;
}
/**
* Pushes a sandbox request to the global deque
* @param sandbox_request
**/
int
sandbox_request_queue_add(sandbox_request_t *sandbox_request)
{
switch (sandbox_request_queue_policy) {
case FIFO:
return sandbox_request_queue_fifo_add(sandbox_request);
case PS:
return sandbox_request_queue_ps_add(sandbox_request);
}
}
/**
* Stealing from the dequeue is a lock-free, cross-core "pop", which removes the element from the end opposite to
* "pop". Because the producer and consumer (the core stealine the sandbox request) modify different ends,
* no locks are required, and coordination is achieved by instead retrying on inconsistent indices.
*
* Relevant Read: https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf
*
* TODO: Notice the mutex_lock for NCORES == 1 in both push/pop functions and steal calling 'pop' for NCORES == 1.
* Ideally you don't call steal for same core consumption but I just made the steal API wrap that logic. Which is
* perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core
* and add an assert in "steal" function for NCORES == 1.
*
* @returns A Sandbox Request or NULL
**/
static inline sandbox_request_t *
sandbox_request_queue_fifo_remove(void)
{
sandbox_request_t *sandbox_request = NULL;
#if NCORES == 1
pthread_mutex_lock(&runtime_global_deque_mutex);
return_code = deque_pop_sandbox(runtime_global_deque, sandbox_request);
pthread_mutex_unlock(&runtime_global_deque_mutex);
#else
int return_code = deque_steal_sandbox(runtime_global_deque, &sandbox_request);
#endif
if (return_code) sandbox_request = NULL;
return sandbox_request;
}
static inline sandbox_request_t *
sandbox_request_queue_ps_remove(void)
{
sandbox_request_t *sandbox_request = NULL;
// TODO
return sandbox_request;
}
sandbox_request_t *
sandbox_request_queue_remove(void)
{
switch (sandbox_request_queue_policy) {
case FIFO:
return sandbox_request_queue_fifo_remove();
case PS:
return sandbox_request_queue_ps_remove();
}
}
Loading…
Cancel
Save