feat: propagate dequeue and remove return codes

sledge_graph
Sean McBride 5 years ago
parent d36b28bf21
commit 9eb5541afa

@ -23,91 +23,97 @@
/* TODO: Implement the ability to dynamically resize! */ /* TODO: Implement the ability to dynamically resize! */
#define DEQUE_MAX_SZ (1 << 23) #define DEQUE_MAX_SZ (1 << 23)
#define DEQUE_PROTOTYPE(name, type) \ #define DEQUE_PROTOTYPE(name, type) \
struct deque_##name { \ struct deque_##name { \
type wrk[DEQUE_MAX_SZ]; \ type wrk[DEQUE_MAX_SZ]; \
long size; \ long size; \
\ \
volatile long top; \ volatile long top; \
volatile long bottom; \ volatile long bottom; \
}; \ }; \
\ \
static inline void deque_init_##name(struct deque_##name *q, size_t sz) \ static inline void deque_init_##name(struct deque_##name *q, size_t sz) \
{ \ { \
memset(q, 0, sizeof(struct deque_##name)); \ memset(q, 0, sizeof(struct deque_##name)); \
\ \
if (sz) { \ if (sz) { \
/* only for size with pow of 2 */ \ /* only for size with pow of 2 */ \
/* assert((sz & (sz - 1)) == 0); */ \ /* assert((sz & (sz - 1)) == 0); */ \
assert(sz <= DEQUE_MAX_SZ); \ assert(sz <= DEQUE_MAX_SZ); \
} else { \ } else { \
sz = DEQUE_MAX_SZ; \ sz = DEQUE_MAX_SZ; \
} \ } \
\ \
q->size = sz; \ q->size = sz; \
} \ } \
\ \
/* Use mutual exclusion locks around push/pop if multi-threaded. */ \ /* Use mutual exclusion locks around push/pop if multi-threaded. */ \
static inline int deque_push_##name(struct deque_##name *q, type *w) \ static inline int deque_push_##name(struct deque_##name *q, type *w) \
{ \ { \
long ct, cb; \ long ct, cb; \
\ \
ct = q->top; \ ct = q->top; \
cb = q->bottom; \ cb = q->bottom; \
\ \
/* nope, fixed size only */ \ /* nope, fixed size only */ \
if (q->size - 1 < (cb - ct)) return -ENOSPC; \ if (q->size - 1 < (cb - ct)) return -ENOSPC; \
\ \
q->wrk[cb] = *w; \ q->wrk[cb] = *w; \
__sync_synchronize(); \ __sync_synchronize(); \
if (__sync_bool_compare_and_swap(&q->bottom, cb, cb + 1) == false) assert(0); \ if (__sync_bool_compare_and_swap(&q->bottom, cb, cb + 1) == false) assert(0); \
\ \
return 0; \ return 0; \
} \ } \
\ \
/* Use mutual exclusion locks around push/pop if multi-threaded. */ \ /* Use mutual exclusion locks around push/pop if multi-threaded. */ \
static inline int deque_pop_##name(struct deque_##name *q, type *w) \ static inline int deque_pop_##name(struct deque_##name *q, type *w) \
{ \ { \
long ct = 0, sz = 0; \ long ct = 0, sz = 0; \
long cb = q->bottom - 1; \ long cb = q->bottom - 1; \
int ret = 0; \ int ret = 0; \
\ \
if (__sync_bool_compare_and_swap(&q->bottom, cb + 1, cb) == false) assert(0); \ if (__sync_bool_compare_and_swap(&q->bottom, cb + 1, cb) == false) assert(0); \
\ \
ct = q->top; \ ct = q->top; \
sz = cb - ct; \ sz = cb - ct; \
if (sz < 0) { \ if (sz < 0) { \
if (__sync_bool_compare_and_swap(&q->bottom, cb, ct) == false) assert(0); \ if (__sync_bool_compare_and_swap(&q->bottom, cb, ct) == false) assert(0); \
\ \
return -ENOENT; \ return -ENOENT; \
} \ } \
\ \
*w = q->wrk[cb]; \ *w = q->wrk[cb]; \
if (sz > 0) return 0; \ if (sz > 0) return 0; \
\ \
ret = __sync_bool_compare_and_swap(&q->top, ct, ct + 1); \ ret = __sync_bool_compare_and_swap(&q->top, ct, ct + 1); \
if (__sync_bool_compare_and_swap(&q->bottom, cb, ct + 1) == false) assert(0); \ if (__sync_bool_compare_and_swap(&q->bottom, cb, ct + 1) == false) assert(0); \
if (ret == false) { \ if (ret == false) { \
*w = NULL; \ *w = NULL; \
return -ENOENT; \ return -ENOENT; \
} \ } \
\ \
return 0; \ return 0; \
} \ } \
\ /** \
static inline int deque_steal_##name(struct deque_##name *q, type *w) \ * deque_steal \
{ \ * @param deque \
long ct, cb; \ * @param w pointer to location to copy stolen type to \
\ * @returns 0 if successful, -2 if empty, -11 if unable to perform atomic operation \
ct = q->top; \ */ \
cb = q->bottom; \ static inline int deque_steal_##name(struct deque_##name *deque, type *w) \
\ { \
if (ct >= cb) return -ENOENT; \ long ct, cb; \
\ \
*w = q->wrk[ct]; \ ct = deque->top; \
if (__sync_bool_compare_and_swap(&q->top, ct, ct + 1) == false) return -EAGAIN; \ cb = deque->bottom; \
\ \
return 0; \ /* Empty */ \
if (ct >= cb) return -ENOENT; \
\
*w = deque->wrk[ct]; \
if (__sync_bool_compare_and_swap(&deque->top, ct, ct + 1) == false) return -EAGAIN; \
\
return 0; \
} }
#endif /* DEQUE_H */ #endif /* DEQUE_H */

@ -4,7 +4,7 @@
/* Returns pointer back if successful, null otherwise */ /* Returns pointer back if successful, null otherwise */
typedef sandbox_request_t *(*global_request_scheduler_add_fn_t)(void *); typedef sandbox_request_t *(*global_request_scheduler_add_fn_t)(void *);
typedef sandbox_request_t *(*global_request_scheduler_remove_fn_t)(void); typedef int (*global_request_scheduler_remove_fn_t)(sandbox_request_t **);
typedef uint64_t (*global_request_scheduler_peek_fn_t)(void); typedef uint64_t (*global_request_scheduler_peek_fn_t)(void);
struct global_request_scheduler_config { struct global_request_scheduler_config {
@ -17,5 +17,5 @@ struct global_request_scheduler_config {
void global_request_scheduler_initialize(struct global_request_scheduler_config *config); void global_request_scheduler_initialize(struct global_request_scheduler_config *config);
sandbox_request_t *global_request_scheduler_add(sandbox_request_t *); sandbox_request_t *global_request_scheduler_add(sandbox_request_t *);
sandbox_request_t *global_request_scheduler_remove(); int global_request_scheduler_remove(sandbox_request_t **);
uint64_t global_request_scheduler_peek(); uint64_t global_request_scheduler_peek();

@ -24,11 +24,22 @@ struct priority_queue {
priority_queue_get_priority_fn_t get_priority_fn; priority_queue_get_priority_fn_t get_priority_fn;
}; };
/**
* Checks if a priority queue is empty
* @param self the priority queue to check
* @returns true if empty, else otherwise
*/
static inline bool
priority_queue_is_empty(struct priority_queue *self)
{
return self->highest_priority == ULONG_MAX;
}
void priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_fn_t get_priority_fn); void priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_fn_t get_priority_fn);
int priority_queue_enqueue(struct priority_queue *self, void *value, char *name); int priority_queue_enqueue(struct priority_queue *self, void *value);
void * priority_queue_dequeue(struct priority_queue *self, char *name); int priority_queue_dequeue(struct priority_queue *self, void **dequeued_element);
int priority_queue_length(struct priority_queue *self); int priority_queue_length(struct priority_queue *self);
uint64_t priority_queue_peek(struct priority_queue *self); uint64_t priority_queue_peek(struct priority_queue *self);
int priority_queue_delete(struct priority_queue *self, void *value, char *name); int priority_queue_delete(struct priority_queue *self, void *value);
#endif /* PRIORITY_QUEUE_H */ #endif /* PRIORITY_QUEUE_H */

@ -30,11 +30,11 @@ global_request_scheduler_add(sandbox_request_t *sandbox_request)
* Removes a sandbox request according to the scheduling policy of the variant * Removes a sandbox request according to the scheduling policy of the variant
* @returns pointer to a sandbox request * @returns pointer to a sandbox request
*/ */
sandbox_request_t * int
global_request_scheduler_remove() global_request_scheduler_remove(sandbox_request_t **removed_sandbox)
{ {
assert(global_request_scheduler.remove_fn != NULL); assert(global_request_scheduler.remove_fn != NULL);
return global_request_scheduler.remove_fn(); return global_request_scheduler.remove_fn(removed_sandbox);
} }
/** /**

@ -39,22 +39,27 @@ We are unsure if the locking behavior is correct, so there may be deadlocks */
* perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core * perhaps not good. We might just add the #if in the scheduling code which should explicitly call "pop" for single core
* and add an assert in "steal" function for NCORES == 1. * and add an assert in "steal" function for NCORES == 1.
* *
* @returns A Sandbox Request or NULL * @returns 0 if successfully returned a sandbox request, -1 if empty, -2 if atomic instruction unsuccessful
*/ */
static sandbox_request_t * static int
global_request_scheduler_deque_remove(void) global_request_scheduler_deque_remove(sandbox_request_t **removed_sandbox_request)
{ {
sandbox_request_t *sandbox_request; int return_code;
#if NCORES == 1 #if NCORES == 1
pthread_mutex_lock(&global_request_scheduler_deque_mutex); pthread_mutex_lock(&global_request_scheduler_deque_mutex);
return_code = deque_pop_sandbox(global_request_scheduler_deque, sandbox_request); return_code = deque_pop_sandbox(global_request_scheduler_deque, *removed_sandbox_request);
pthread_mutex_unlock(&global_request_scheduler_deque_mutex); pthread_mutex_unlock(&global_request_scheduler_deque_mutex);
#else #else
int return_code = deque_steal_sandbox(global_request_scheduler_deque, &sandbox_request); return_code = deque_steal_sandbox(global_request_scheduler_deque, removed_sandbox_request);
/* The Deque uses different return codes other than 0, so map here */
if (return_code == -2) {
return_code = -1;
} else if (return_code == -11) {
return_code = -2;
}
#endif #endif
if (return_code) sandbox_request = NULL; return return_code;
return sandbox_request;
} }
void void

@ -1,4 +1,5 @@
#include "global_request_scheduler.h" #include "global_request_scheduler.h"
#include "panic.h"
#include "priority_queue.h" #include "priority_queue.h"
static struct priority_queue global_request_scheduler_minheap; static struct priority_queue global_request_scheduler_minheap;
@ -11,25 +12,20 @@ static struct priority_queue global_request_scheduler_minheap;
static sandbox_request_t * static sandbox_request_t *
global_request_scheduler_minheap_add(void *sandbox_request) global_request_scheduler_minheap_add(void *sandbox_request)
{ {
int return_code = priority_queue_enqueue(&global_request_scheduler_minheap, sandbox_request, "Request"); int return_code = priority_queue_enqueue(&global_request_scheduler_minheap, sandbox_request);
/* TODO: Propagate -1 to caller */
if (return_code == -1) { if (return_code == -1) panic("Request Queue is full\n");
printf("Request Queue is full\n");
exit(EXIT_FAILURE);
}
if (return_code != 0) return NULL;
return sandbox_request; return sandbox_request;
} }
/** /**
* * @param pointer to the pointer that we want to set to the address of the removed sandbox request
* @returns A Sandbox Request or NULL * @returns 0 if successful, -1 if empty, -2 if unable to take lock or perform atomic operation
*/ */
static sandbox_request_t * int
global_request_scheduler_minheap_remove(void) global_request_scheduler_minheap_remove(sandbox_request_t **removed_sandbox_request)
{ {
return (sandbox_request_t *)priority_queue_dequeue(&global_request_scheduler_minheap, "Request"); return priority_queue_dequeue(&global_request_scheduler_minheap, (void **)removed_sandbox_request);
} }
/** /**

@ -44,8 +44,14 @@ struct sandbox *
local_runqueue_list_get_next() local_runqueue_list_get_next()
{ {
if (local_runqueue_is_empty()) { if (local_runqueue_is_empty()) {
sandbox_request_t *sandbox_request = global_request_scheduler_remove(); sandbox_request_t *sandbox_request;
if (sandbox_request == NULL) return NULL;
int return_code = global_request_scheduler_remove(&sandbox_request);
if (return_code != 0) return NULL;
/* TODO: sandbox_allocate should free sandbox_request on success */
/* TODO: sandbox_allocate should return RC so we can readd sandbox_request to global_request_scheduler
* if needed */
struct sandbox *sandbox = sandbox_allocate(sandbox_request); struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox); assert(sandbox);
free(sandbox_request); free(sandbox_request);

@ -17,9 +17,7 @@ __thread static struct priority_queue local_runqueue_minheap;
bool bool
local_runqueue_minheap_is_empty() local_runqueue_minheap_is_empty()
{ {
int length = priority_queue_length(&local_runqueue_minheap); return priority_queue_is_empty(&local_runqueue_minheap);
assert(length < 5);
return priority_queue_length(&local_runqueue_minheap) == 0;
} }
/** /**
@ -30,23 +28,20 @@ local_runqueue_minheap_is_empty()
void void
local_runqueue_minheap_add(struct sandbox *sandbox) local_runqueue_minheap_add(struct sandbox *sandbox)
{ {
int original_length = priority_queue_length(&local_runqueue_minheap); int return_code = priority_queue_enqueue(&local_runqueue_minheap, sandbox);
/* TODO: propagate RC to caller */
int return_code = priority_queue_enqueue(&local_runqueue_minheap, sandbox, "Runqueue");
if (return_code == -1) panic("Thread Runqueue is full!\n"); if (return_code == -1) panic("Thread Runqueue is full!\n");
/* Assumption: Should always take lock because thread-local runqueue */
assert(return_code != -2);
} }
/** /**
* Removes the highest priority sandbox from the run queue * Removes the highest priority sandbox from the run queue
* @returns A Sandbox or NULL if empty * @param pointer to test to address of removed sandbox if successful
* @returns 0 if successful, -1 if empty, -2 if unable to take lock
*/ */
static struct sandbox * static int
local_runqueue_minheap_remove() local_runqueue_minheap_remove(struct sandbox **to_remove)
{ {
return (struct sandbox *)priority_queue_dequeue(&local_runqueue_minheap, "Runqueue"); return priority_queue_dequeue(&local_runqueue_minheap, (void **)to_remove);
} }
/** /**
@ -57,7 +52,7 @@ static void
local_runqueue_minheap_delete(struct sandbox *sandbox) local_runqueue_minheap_delete(struct sandbox *sandbox)
{ {
assert(sandbox != NULL); assert(sandbox != NULL);
int rc = priority_queue_delete(&local_runqueue_minheap, sandbox, "Runqueue"); int rc = priority_queue_delete(&local_runqueue_minheap, sandbox);
if (rc == -1) { if (rc == -1) {
panic("Err: Thread Local %lu tried to delete sandbox %lu from runqueue, but was not present\n", panic("Err: Thread Local %lu tried to delete sandbox %lu from runqueue, but was not present\n",
pthread_self(), sandbox->start_time); pthread_self(), sandbox->start_time);
@ -75,24 +70,31 @@ local_runqueue_minheap_delete(struct sandbox *sandbox)
struct sandbox * struct sandbox *
local_runqueue_minheap_get_next() local_runqueue_minheap_get_next()
{ {
if (local_runqueue_is_empty()) { struct sandbox *sandbox = NULL;
/* Try to pull a sandbox request and return NULL if we're unable to get one */ int sandbox_rc = local_runqueue_minheap_remove(&sandbox);
if (sandbox_rc == 0) {
/* Sandbox ready pulled from local runqueue */
/* TODO: We remove and immediately re-add sandboxes. This seems like extra work. Consider an API to get
* the min without removing it
*/
local_runqueue_minheap_add(sandbox);
} else if (sandbox_rc == -1) {
/* local runqueue was empty, try to pull a sandbox request and return NULL if we're unable to get one */
sandbox_request_t *sandbox_request; sandbox_request_t *sandbox_request;
if ((sandbox_request = global_request_scheduler_remove()) == NULL) { return NULL; }; int sandbox_request_rc = global_request_scheduler_remove(&sandbox_request);
if (sandbox_request_rc != 0) return NULL;
/* Otherwise, allocate the sandbox request as a runnable sandbox and place on the runqueue */ sandbox = sandbox_allocate(sandbox_request);
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
if (sandbox == NULL) return NULL;
assert(sandbox); assert(sandbox);
free(sandbox_request); free(sandbox_request);
sandbox->state = RUNNABLE; sandbox->state = RUNNABLE;
local_runqueue_minheap_add(sandbox); local_runqueue_minheap_add(sandbox);
return sandbox; } else if (sandbox_rc == -2) {
/* Unable to take lock, so just return NULL and try later */
assert(sandbox == NULL);
} }
/* Resume the sandbox at the top of the runqueue */
struct sandbox *sandbox = local_runqueue_minheap_remove();
local_runqueue_minheap_add(sandbox);
return sandbox; return sandbox;
} }
@ -128,7 +130,13 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
/* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */ /* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */
sandbox_request_t *sandbox_request; sandbox_request_t *sandbox_request;
if (global_deadline < local_deadline && (sandbox_request = global_request_scheduler_remove()) != NULL) { if (global_deadline < local_deadline) {
sandbox_request_t *sandbox_request;
int return_code = global_request_scheduler_remove(&sandbox_request);
// If we were unable to get a sandbox_request, exit
if (return_code == -1 || return_code == -2) goto done;
printf("Thread %lu Preempted %lu for %lu\n", pthread_self(), local_deadline, printf("Thread %lu Preempted %lu for %lu\n", pthread_self(), local_deadline,
sandbox_request->absolute_deadline); sandbox_request->absolute_deadline);
@ -155,6 +163,7 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1) if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1)
should_enable_software_interrupt = false; should_enable_software_interrupt = false;
} }
done:
if (should_enable_software_interrupt) software_interrupt_enable(); if (should_enable_software_interrupt) software_interrupt_enable();
} }

@ -110,6 +110,19 @@ priority_queue_percolate_down(struct priority_queue *self, int parent_index)
} }
} }
/**
* Checks if a priority queue is empty
* @param self the priority queue to check
* @returns true if empty, else otherwise
*/
static inline bool
priority_queue_is_empty_locked(struct priority_queue *self)
{
assert(self != NULL);
assert(ck_spinlock_fas_locked(&self->lock));
return self->first_free == 1;
}
/********************* /*********************
* Public API * * Public API *
********************/ ********************/
@ -152,22 +165,15 @@ priority_queue_length(struct priority_queue *self)
/** /**
* @param self - the priority queue we want to add to * @param self - the priority queue we want to add to
* @param value - the value we want to add * @param value - the value we want to add
* @returns 0 on success. -1 on full. -2 on unable to take lock * @returns 0 on success. -1 on full.
*/ */
int int
priority_queue_enqueue(struct priority_queue *self, void *value, char *name) priority_queue_enqueue(struct priority_queue *self, void *value)
{ {
assert(self != NULL); assert(self != NULL);
ck_spinlock_fas_lock(&self->lock); ck_spinlock_fas_lock(&self->lock);
int pre_length = self->first_free - 1; if (priority_queue_append(self, value) == -1) return -1;
if (priority_queue_append(self, value) == -1) panic("Priority Queue is full");
int post_length = self->first_free - 1;
/* We should have appended here */
assert(post_length == pre_length + 1);
/* If this is the first element we add, update the highest priority */ /* If this is the first element we add, update the highest priority */
if (self->first_free == 2) { if (self->first_free == 2) {
@ -184,7 +190,7 @@ priority_queue_enqueue(struct priority_queue *self, void *value, char *name)
* @returns 0 on success. -1 on not found * @returns 0 on success. -1 on not found
*/ */
int int
priority_queue_delete(struct priority_queue *self, void *value, char *name) priority_queue_delete(struct priority_queue *self, void *value)
{ {
assert(self != NULL); assert(self != NULL);
ck_spinlock_fas_lock(&self->lock); ck_spinlock_fas_lock(&self->lock);
@ -204,49 +210,48 @@ priority_queue_delete(struct priority_queue *self, void *value, char *name)
return 0; return 0;
} }
static bool
priority_queue_is_empty(struct priority_queue *self)
{
assert(self != NULL);
bool caller_locked = ck_spinlock_fas_locked(&self->lock);
if (!caller_locked) ck_spinlock_fas_lock(&self->lock);
assert(self->first_free != 0);
bool is_empty = self->first_free == 1;
if (!caller_locked) ck_spinlock_fas_unlock(&self->lock);
return is_empty;
}
/** /**
* @param self - the priority queue we want to add to * @param self - the priority queue we want to add to
* @returns The head of the priority queue or NULL when empty * @param dequeued_element a pointer to set to the dequeued element
* @returns RC 0 if successfully set dequeued_element, -1 if empty, -2 if unable to take lock
*/ */
void * int
priority_queue_dequeue(struct priority_queue *self, char *name) priority_queue_dequeue(struct priority_queue *self, void **dequeued_element)
{ {
assert(self != NULL); assert(self != NULL);
assert(dequeued_element != NULL);
assert(self->get_priority_fn != NULL); assert(self->get_priority_fn != NULL);
if (priority_queue_is_empty(self)) return NULL;
ck_spinlock_fas_lock(&self->lock); int return_code;
void *min = NULL; if (ck_spinlock_fas_trylock(&self->lock) == false) {
if (!priority_queue_is_empty(self)) { return_code = -2;
min = self->items[1]; goto done;
self->items[1] = self->items[--self->first_free]; };
self->items[self->first_free] = NULL;
/* Because of 1-based indices, first_free is 2 when there is only one element */ if (priority_queue_is_empty_locked(self)) {
if (self->first_free > 2) priority_queue_percolate_down(self, 1); return_code = -1;
goto release_lock;
/* Update the highest priority */
if (!priority_queue_is_empty(self)) {
self->highest_priority = self->get_priority_fn(self->items[1]);
} else {
self->highest_priority = ULONG_MAX;
}
} }
ck_spinlock_fas_unlock(&self->lock);
return min; *dequeued_element = self->items[1];
self->items[1] = self->items[--self->first_free];
self->items[self->first_free] = NULL;
/* Because of 1-based indices, first_free is 2 when there is only one element */
if (self->first_free > 2) priority_queue_percolate_down(self, 1);
/* Update the highest priority */
if (!priority_queue_is_empty_locked(self)) {
self->highest_priority = self->get_priority_fn(self->items[1]);
} else {
self->highest_priority = ULONG_MAX;
}
return_code = 0;
release_lock:
ck_spinlock_fas_unlock(&self->lock);
done:
return return_code;
} }
/** /**
@ -259,4 +264,4 @@ uint64_t
priority_queue_peek(struct priority_queue *self) priority_queue_peek(struct priority_queue *self)
{ {
return self->highest_priority; return self->highest_priority;
} }

Loading…
Cancel
Save