From 9eb5541afa1d952576f89f633dddd7bf4d127244 Mon Sep 17 00:00:00 2001 From: Sean McBride Date: Sat, 4 Jul 2020 18:48:37 -0400 Subject: [PATCH] feat: propagate dequeue and remove return codes --- runtime/include/deque.h | 176 +++++++++--------- runtime/include/global_request_scheduler.h | 4 +- runtime/include/priority_queue.h | 17 +- runtime/src/global_request_scheduler.c | 6 +- runtime/src/global_request_scheduler_deque.c | 23 ++- .../src/global_request_scheduler_minheap.c | 22 +-- runtime/src/local_runqueue_list.c | 10 +- runtime/src/local_runqueue_minheap.c | 61 +++--- runtime/src/priority_queue.c | 95 +++++----- 9 files changed, 226 insertions(+), 188 deletions(-) diff --git a/runtime/include/deque.h b/runtime/include/deque.h index 2345a1d..a49cfc0 100644 --- a/runtime/include/deque.h +++ b/runtime/include/deque.h @@ -23,91 +23,97 @@ /* TODO: Implement the ability to dynamically resize! */ #define DEQUE_MAX_SZ (1 << 23) -#define DEQUE_PROTOTYPE(name, type) \ - struct deque_##name { \ - type wrk[DEQUE_MAX_SZ]; \ - long size; \ - \ - volatile long top; \ - volatile long bottom; \ - }; \ - \ - static inline void deque_init_##name(struct deque_##name *q, size_t sz) \ - { \ - memset(q, 0, sizeof(struct deque_##name)); \ - \ - if (sz) { \ - /* only for size with pow of 2 */ \ - /* assert((sz & (sz - 1)) == 0); */ \ - assert(sz <= DEQUE_MAX_SZ); \ - } else { \ - sz = DEQUE_MAX_SZ; \ - } \ - \ - q->size = sz; \ - } \ - \ - /* Use mutual exclusion locks around push/pop if multi-threaded. */ \ - static inline int deque_push_##name(struct deque_##name *q, type *w) \ - { \ - long ct, cb; \ - \ - ct = q->top; \ - cb = q->bottom; \ - \ - /* nope, fixed size only */ \ - if (q->size - 1 < (cb - ct)) return -ENOSPC; \ - \ - q->wrk[cb] = *w; \ - __sync_synchronize(); \ - if (__sync_bool_compare_and_swap(&q->bottom, cb, cb + 1) == false) assert(0); \ - \ - return 0; \ - } \ - \ - /* Use mutual exclusion locks around push/pop if multi-threaded. */ \ - static inline int deque_pop_##name(struct deque_##name *q, type *w) \ - { \ - long ct = 0, sz = 0; \ - long cb = q->bottom - 1; \ - int ret = 0; \ - \ - if (__sync_bool_compare_and_swap(&q->bottom, cb + 1, cb) == false) assert(0); \ - \ - ct = q->top; \ - sz = cb - ct; \ - if (sz < 0) { \ - if (__sync_bool_compare_and_swap(&q->bottom, cb, ct) == false) assert(0); \ - \ - return -ENOENT; \ - } \ - \ - *w = q->wrk[cb]; \ - if (sz > 0) return 0; \ - \ - ret = __sync_bool_compare_and_swap(&q->top, ct, ct + 1); \ - if (__sync_bool_compare_and_swap(&q->bottom, cb, ct + 1) == false) assert(0); \ - if (ret == false) { \ - *w = NULL; \ - return -ENOENT; \ - } \ - \ - return 0; \ - } \ - \ - static inline int deque_steal_##name(struct deque_##name *q, type *w) \ - { \ - long ct, cb; \ - \ - ct = q->top; \ - cb = q->bottom; \ - \ - if (ct >= cb) return -ENOENT; \ - \ - *w = q->wrk[ct]; \ - if (__sync_bool_compare_and_swap(&q->top, ct, ct + 1) == false) return -EAGAIN; \ - \ - return 0; \ +#define DEQUE_PROTOTYPE(name, type) \ + struct deque_##name { \ + type wrk[DEQUE_MAX_SZ]; \ + long size; \ + \ + volatile long top; \ + volatile long bottom; \ + }; \ + \ + static inline void deque_init_##name(struct deque_##name *q, size_t sz) \ + { \ + memset(q, 0, sizeof(struct deque_##name)); \ + \ + if (sz) { \ + /* only for size with pow of 2 */ \ + /* assert((sz & (sz - 1)) == 0); */ \ + assert(sz <= DEQUE_MAX_SZ); \ + } else { \ + sz = DEQUE_MAX_SZ; \ + } \ + \ + q->size = sz; \ + } \ + \ + /* Use mutual exclusion locks around push/pop if multi-threaded. */ \ + static inline int deque_push_##name(struct deque_##name *q, type *w) \ + { \ + long ct, cb; \ + \ + ct = q->top; \ + cb = q->bottom; \ + \ + /* nope, fixed size only */ \ + if (q->size - 1 < (cb - ct)) return -ENOSPC; \ + \ + q->wrk[cb] = *w; \ + __sync_synchronize(); \ + if (__sync_bool_compare_and_swap(&q->bottom, cb, cb + 1) == false) assert(0); \ + \ + return 0; \ + } \ + \ + /* Use mutual exclusion locks around push/pop if multi-threaded. */ \ + static inline int deque_pop_##name(struct deque_##name *q, type *w) \ + { \ + long ct = 0, sz = 0; \ + long cb = q->bottom - 1; \ + int ret = 0; \ + \ + if (__sync_bool_compare_and_swap(&q->bottom, cb + 1, cb) == false) assert(0); \ + \ + ct = q->top; \ + sz = cb - ct; \ + if (sz < 0) { \ + if (__sync_bool_compare_and_swap(&q->bottom, cb, ct) == false) assert(0); \ + \ + return -ENOENT; \ + } \ + \ + *w = q->wrk[cb]; \ + if (sz > 0) return 0; \ + \ + ret = __sync_bool_compare_and_swap(&q->top, ct, ct + 1); \ + if (__sync_bool_compare_and_swap(&q->bottom, cb, ct + 1) == false) assert(0); \ + if (ret == false) { \ + *w = NULL; \ + return -ENOENT; \ + } \ + \ + return 0; \ + } \ + /** \ + * deque_steal \ + * @param deque \ + * @param w pointer to location to copy stolen type to \ + * @returns 0 if successful, -2 if empty, -11 if unable to perform atomic operation \ + */ \ + static inline int deque_steal_##name(struct deque_##name *deque, type *w) \ + { \ + long ct, cb; \ + \ + ct = deque->top; \ + cb = deque->bottom; \ + \ + /* Empty */ \ + if (ct >= cb) return -ENOENT; \ + \ + *w = deque->wrk[ct]; \ + if (__sync_bool_compare_and_swap(&deque->top, ct, ct + 1) == false) return -EAGAIN; \ + \ + return 0; \ } #endif /* DEQUE_H */ diff --git a/runtime/include/global_request_scheduler.h b/runtime/include/global_request_scheduler.h index 0972e1c..252dde5 100644 --- a/runtime/include/global_request_scheduler.h +++ b/runtime/include/global_request_scheduler.h @@ -4,7 +4,7 @@ /* Returns pointer back if successful, null otherwise */ typedef sandbox_request_t *(*global_request_scheduler_add_fn_t)(void *); -typedef sandbox_request_t *(*global_request_scheduler_remove_fn_t)(void); +typedef int (*global_request_scheduler_remove_fn_t)(sandbox_request_t **); typedef uint64_t (*global_request_scheduler_peek_fn_t)(void); struct global_request_scheduler_config { @@ -17,5 +17,5 @@ struct global_request_scheduler_config { void global_request_scheduler_initialize(struct global_request_scheduler_config *config); sandbox_request_t *global_request_scheduler_add(sandbox_request_t *); -sandbox_request_t *global_request_scheduler_remove(); +int global_request_scheduler_remove(sandbox_request_t **); uint64_t global_request_scheduler_peek(); diff --git a/runtime/include/priority_queue.h b/runtime/include/priority_queue.h index d178095..65a07f2 100644 --- a/runtime/include/priority_queue.h +++ b/runtime/include/priority_queue.h @@ -24,11 +24,22 @@ struct priority_queue { priority_queue_get_priority_fn_t get_priority_fn; }; +/** + * Checks if a priority queue is empty + * @param self the priority queue to check + * @returns true if empty, else otherwise + */ +static inline bool +priority_queue_is_empty(struct priority_queue *self) +{ + return self->highest_priority == ULONG_MAX; +} + void priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_fn_t get_priority_fn); -int priority_queue_enqueue(struct priority_queue *self, void *value, char *name); -void * priority_queue_dequeue(struct priority_queue *self, char *name); +int priority_queue_enqueue(struct priority_queue *self, void *value); +int priority_queue_dequeue(struct priority_queue *self, void **dequeued_element); int priority_queue_length(struct priority_queue *self); uint64_t priority_queue_peek(struct priority_queue *self); -int priority_queue_delete(struct priority_queue *self, void *value, char *name); +int priority_queue_delete(struct priority_queue *self, void *value); #endif /* PRIORITY_QUEUE_H */ \ No newline at end of file diff --git a/runtime/src/global_request_scheduler.c b/runtime/src/global_request_scheduler.c index 0198e2d..53afafc 100644 --- a/runtime/src/global_request_scheduler.c +++ b/runtime/src/global_request_scheduler.c @@ -30,11 +30,11 @@ global_request_scheduler_add(sandbox_request_t *sandbox_request) * Removes a sandbox request according to the scheduling policy of the variant * @returns pointer to a sandbox request */ -sandbox_request_t * -global_request_scheduler_remove() +int +global_request_scheduler_remove(sandbox_request_t **removed_sandbox) { assert(global_request_scheduler.remove_fn != NULL); - return global_request_scheduler.remove_fn(); + return global_request_scheduler.remove_fn(removed_sandbox); } /** diff --git a/runtime/src/global_request_scheduler_deque.c b/runtime/src/global_request_scheduler_deque.c index 7ee14b9..45a9eac 100644 --- a/runtime/src/global_request_scheduler_deque.c +++ b/runtime/src/global_request_scheduler_deque.c @@ -39,22 +39,27 @@ We are unsure if the locking behavior is correct, so there may be deadlocks */ * perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core * and add an assert in "steal" function for NCORES == 1. * - * @returns A Sandbox Request or NULL + * @returns 0 if successfully returned a sandbox request, -1 if empty, -2 if atomic instruction unsuccessful */ -static sandbox_request_t * -global_request_scheduler_deque_remove(void) +static int +global_request_scheduler_deque_remove(sandbox_request_t **removed_sandbox_request) { - sandbox_request_t *sandbox_request; - + int return_code; #if NCORES == 1 pthread_mutex_lock(&global_request_scheduler_deque_mutex); - return_code = deque_pop_sandbox(global_request_scheduler_deque, sandbox_request); + return_code = deque_pop_sandbox(global_request_scheduler_deque, *removed_sandbox_request); pthread_mutex_unlock(&global_request_scheduler_deque_mutex); #else - int return_code = deque_steal_sandbox(global_request_scheduler_deque, &sandbox_request); + return_code = deque_steal_sandbox(global_request_scheduler_deque, removed_sandbox_request); + /* The Deque uses different return codes other than 0, so map here */ + if (return_code == -2) { + return_code = -1; + } else if (return_code == -11) { + return_code = -2; + } + #endif - if (return_code) sandbox_request = NULL; - return sandbox_request; + return return_code; } void diff --git a/runtime/src/global_request_scheduler_minheap.c b/runtime/src/global_request_scheduler_minheap.c index af86c23..899f4f3 100644 --- a/runtime/src/global_request_scheduler_minheap.c +++ b/runtime/src/global_request_scheduler_minheap.c @@ -1,4 +1,5 @@ #include "global_request_scheduler.h" +#include "panic.h" #include "priority_queue.h" static struct priority_queue global_request_scheduler_minheap; @@ -11,25 +12,20 @@ static struct priority_queue global_request_scheduler_minheap; static sandbox_request_t * global_request_scheduler_minheap_add(void *sandbox_request) { - int return_code = priority_queue_enqueue(&global_request_scheduler_minheap, sandbox_request, "Request"); - - if (return_code == -1) { - printf("Request Queue is full\n"); - exit(EXIT_FAILURE); - } - - if (return_code != 0) return NULL; + int return_code = priority_queue_enqueue(&global_request_scheduler_minheap, sandbox_request); + /* TODO: Propagate -1 to caller */ + if (return_code == -1) panic("Request Queue is full\n"); return sandbox_request; } /** - * - * @returns A Sandbox Request or NULL + * @param pointer to the pointer that we want to set to the address of the removed sandbox request + * @returns 0 if successful, -1 if empty, -2 if unable to take lock or perform atomic operation */ -static sandbox_request_t * -global_request_scheduler_minheap_remove(void) +int +global_request_scheduler_minheap_remove(sandbox_request_t **removed_sandbox_request) { - return (sandbox_request_t *)priority_queue_dequeue(&global_request_scheduler_minheap, "Request"); + return priority_queue_dequeue(&global_request_scheduler_minheap, (void **)removed_sandbox_request); } /** diff --git a/runtime/src/local_runqueue_list.c b/runtime/src/local_runqueue_list.c index e9d6a6c..6080e91 100644 --- a/runtime/src/local_runqueue_list.c +++ b/runtime/src/local_runqueue_list.c @@ -44,8 +44,14 @@ struct sandbox * local_runqueue_list_get_next() { if (local_runqueue_is_empty()) { - sandbox_request_t *sandbox_request = global_request_scheduler_remove(); - if (sandbox_request == NULL) return NULL; + sandbox_request_t *sandbox_request; + + int return_code = global_request_scheduler_remove(&sandbox_request); + if (return_code != 0) return NULL; + + /* TODO: sandbox_allocate should free sandbox_request on success */ + /* TODO: sandbox_allocate should return RC so we can readd sandbox_request to global_request_scheduler + * if needed */ struct sandbox *sandbox = sandbox_allocate(sandbox_request); assert(sandbox); free(sandbox_request); diff --git a/runtime/src/local_runqueue_minheap.c b/runtime/src/local_runqueue_minheap.c index 581403a..98ebdcf 100644 --- a/runtime/src/local_runqueue_minheap.c +++ b/runtime/src/local_runqueue_minheap.c @@ -17,9 +17,7 @@ __thread static struct priority_queue local_runqueue_minheap; bool local_runqueue_minheap_is_empty() { - int length = priority_queue_length(&local_runqueue_minheap); - assert(length < 5); - return priority_queue_length(&local_runqueue_minheap) == 0; + return priority_queue_is_empty(&local_runqueue_minheap); } /** @@ -30,23 +28,20 @@ local_runqueue_minheap_is_empty() void local_runqueue_minheap_add(struct sandbox *sandbox) { - int original_length = priority_queue_length(&local_runqueue_minheap); - - int return_code = priority_queue_enqueue(&local_runqueue_minheap, sandbox, "Runqueue"); + int return_code = priority_queue_enqueue(&local_runqueue_minheap, sandbox); + /* TODO: propagate RC to caller */ if (return_code == -1) panic("Thread Runqueue is full!\n"); - - /* Assumption: Should always take lock because thread-local runqueue */ - assert(return_code != -2); } /** * Removes the highest priority sandbox from the run queue - * @returns A Sandbox or NULL if empty + * @param pointer to test to address of removed sandbox if successful + * @returns 0 if successful, -1 if empty, -2 if unable to take lock */ -static struct sandbox * -local_runqueue_minheap_remove() +static int +local_runqueue_minheap_remove(struct sandbox **to_remove) { - return (struct sandbox *)priority_queue_dequeue(&local_runqueue_minheap, "Runqueue"); + return priority_queue_dequeue(&local_runqueue_minheap, (void **)to_remove); } /** @@ -57,7 +52,7 @@ static void local_runqueue_minheap_delete(struct sandbox *sandbox) { assert(sandbox != NULL); - int rc = priority_queue_delete(&local_runqueue_minheap, sandbox, "Runqueue"); + int rc = priority_queue_delete(&local_runqueue_minheap, sandbox); if (rc == -1) { panic("Err: Thread Local %lu tried to delete sandbox %lu from runqueue, but was not present\n", pthread_self(), sandbox->start_time); @@ -75,24 +70,31 @@ local_runqueue_minheap_delete(struct sandbox *sandbox) struct sandbox * local_runqueue_minheap_get_next() { - if (local_runqueue_is_empty()) { - /* Try to pull a sandbox request and return NULL if we're unable to get one */ + struct sandbox *sandbox = NULL; + int sandbox_rc = local_runqueue_minheap_remove(&sandbox); + + if (sandbox_rc == 0) { + /* Sandbox ready pulled from local runqueue */ + + /* TODO: We remove and immediately re-add sandboxes. This seems like extra work. Consider an API to get + * the min without removing it + */ + local_runqueue_minheap_add(sandbox); + } else if (sandbox_rc == -1) { + /* local runqueue was empty, try to pull a sandbox request and return NULL if we're unable to get one */ sandbox_request_t *sandbox_request; - if ((sandbox_request = global_request_scheduler_remove()) == NULL) { return NULL; }; + int sandbox_request_rc = global_request_scheduler_remove(&sandbox_request); + if (sandbox_request_rc != 0) return NULL; - /* Otherwise, allocate the sandbox request as a runnable sandbox and place on the runqueue */ - struct sandbox *sandbox = sandbox_allocate(sandbox_request); - if (sandbox == NULL) return NULL; + sandbox = sandbox_allocate(sandbox_request); assert(sandbox); free(sandbox_request); sandbox->state = RUNNABLE; local_runqueue_minheap_add(sandbox); - return sandbox; + } else if (sandbox_rc == -2) { + /* Unable to take lock, so just return NULL and try later */ + assert(sandbox == NULL); } - - /* Resume the sandbox at the top of the runqueue */ - struct sandbox *sandbox = local_runqueue_minheap_remove(); - local_runqueue_minheap_add(sandbox); return sandbox; } @@ -128,7 +130,13 @@ local_runqueue_minheap_preempt(ucontext_t *user_context) /* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */ sandbox_request_t *sandbox_request; - if (global_deadline < local_deadline && (sandbox_request = global_request_scheduler_remove()) != NULL) { + if (global_deadline < local_deadline) { + sandbox_request_t *sandbox_request; + int return_code = global_request_scheduler_remove(&sandbox_request); + + // If we were unable to get a sandbox_request, exit + if (return_code == -1 || return_code == -2) goto done; + printf("Thread %lu Preempted %lu for %lu\n", pthread_self(), local_deadline, sandbox_request->absolute_deadline); @@ -155,6 +163,7 @@ local_runqueue_minheap_preempt(ucontext_t *user_context) if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1) should_enable_software_interrupt = false; } +done: if (should_enable_software_interrupt) software_interrupt_enable(); } diff --git a/runtime/src/priority_queue.c b/runtime/src/priority_queue.c index 6a3d9c9..d04179e 100644 --- a/runtime/src/priority_queue.c +++ b/runtime/src/priority_queue.c @@ -110,6 +110,19 @@ priority_queue_percolate_down(struct priority_queue *self, int parent_index) } } +/** + * Checks if a priority queue is empty + * @param self the priority queue to check + * @returns true if empty, else otherwise + */ +static inline bool +priority_queue_is_empty_locked(struct priority_queue *self) +{ + assert(self != NULL); + assert(ck_spinlock_fas_locked(&self->lock)); + return self->first_free == 1; +} + /********************* * Public API * ********************/ @@ -152,22 +165,15 @@ priority_queue_length(struct priority_queue *self) /** * @param self - the priority queue we want to add to * @param value - the value we want to add - * @returns 0 on success. -1 on full. -2 on unable to take lock + * @returns 0 on success. -1 on full. */ int -priority_queue_enqueue(struct priority_queue *self, void *value, char *name) +priority_queue_enqueue(struct priority_queue *self, void *value) { assert(self != NULL); ck_spinlock_fas_lock(&self->lock); - int pre_length = self->first_free - 1; - - if (priority_queue_append(self, value) == -1) panic("Priority Queue is full"); - - int post_length = self->first_free - 1; - - /* We should have appended here */ - assert(post_length == pre_length + 1); + if (priority_queue_append(self, value) == -1) return -1; /* If this is the first element we add, update the highest priority */ if (self->first_free == 2) { @@ -184,7 +190,7 @@ priority_queue_enqueue(struct priority_queue *self, void *value, char *name) * @returns 0 on success. -1 on not found */ int -priority_queue_delete(struct priority_queue *self, void *value, char *name) +priority_queue_delete(struct priority_queue *self, void *value) { assert(self != NULL); ck_spinlock_fas_lock(&self->lock); @@ -204,49 +210,48 @@ priority_queue_delete(struct priority_queue *self, void *value, char *name) return 0; } -static bool -priority_queue_is_empty(struct priority_queue *self) -{ - assert(self != NULL); - bool caller_locked = ck_spinlock_fas_locked(&self->lock); - if (!caller_locked) ck_spinlock_fas_lock(&self->lock); - assert(self->first_free != 0); - bool is_empty = self->first_free == 1; - if (!caller_locked) ck_spinlock_fas_unlock(&self->lock); - return is_empty; -} - /** * @param self - the priority queue we want to add to - * @returns The head of the priority queue or NULL when empty + * @param dequeued_element a pointer to set to the dequeued element + * @returns RC 0 if successfully set dequeued_element, -1 if empty, -2 if unable to take lock */ -void * -priority_queue_dequeue(struct priority_queue *self, char *name) +int +priority_queue_dequeue(struct priority_queue *self, void **dequeued_element) { assert(self != NULL); + assert(dequeued_element != NULL); assert(self->get_priority_fn != NULL); - if (priority_queue_is_empty(self)) return NULL; - ck_spinlock_fas_lock(&self->lock); + int return_code; - void *min = NULL; - if (!priority_queue_is_empty(self)) { - min = self->items[1]; - self->items[1] = self->items[--self->first_free]; - self->items[self->first_free] = NULL; - /* Because of 1-based indices, first_free is 2 when there is only one element */ - if (self->first_free > 2) priority_queue_percolate_down(self, 1); - - /* Update the highest priority */ - if (!priority_queue_is_empty(self)) { - self->highest_priority = self->get_priority_fn(self->items[1]); - } else { - self->highest_priority = ULONG_MAX; - } + if (ck_spinlock_fas_trylock(&self->lock) == false) { + return_code = -2; + goto done; + }; + + if (priority_queue_is_empty_locked(self)) { + return_code = -1; + goto release_lock; } - ck_spinlock_fas_unlock(&self->lock); - return min; + *dequeued_element = self->items[1]; + self->items[1] = self->items[--self->first_free]; + self->items[self->first_free] = NULL; + /* Because of 1-based indices, first_free is 2 when there is only one element */ + if (self->first_free > 2) priority_queue_percolate_down(self, 1); + + /* Update the highest priority */ + if (!priority_queue_is_empty_locked(self)) { + self->highest_priority = self->get_priority_fn(self->items[1]); + } else { + self->highest_priority = ULONG_MAX; + } + return_code = 0; + +release_lock: + ck_spinlock_fas_unlock(&self->lock); +done: + return return_code; } /** @@ -259,4 +264,4 @@ uint64_t priority_queue_peek(struct priority_queue *self) { return self->highest_priority; -} \ No newline at end of file +}