fix: cleanly handling preemption

main
Sean McBride 5 years ago
parent 218893ed3b
commit 653af014a7

@ -129,8 +129,7 @@ priority_queue_initialize(struct priority_queue *self, priority_queue_get_priori
memset(self->items, 0, sizeof(void *) * MAX); memset(self->items, 0, sizeof(void *) * MAX);
ck_spinlock_fas_init(&self->lock); ck_spinlock_fas_init(&self->lock);
self->first_free = 1; self->first_free = 1;
printf("[Init] First Free: %d\n", self->first_free);
self->get_priority = get_priority; self->get_priority = get_priority;
// We're assuming a min-heap implementation, so set to larget possible value // We're assuming a min-heap implementation, so set to larget possible value
@ -144,7 +143,6 @@ priority_queue_initialize(struct priority_queue *self, priority_queue_get_priori
int int
priority_queue_length(struct priority_queue *self) priority_queue_length(struct priority_queue *self)
{ {
// printf("[Length] First Free: %d\n", self->first_free);
assert(self != NULL); assert(self != NULL);
ck_spinlock_fas_lock(&self->lock); ck_spinlock_fas_lock(&self->lock);
assert(ck_spinlock_fas_locked(&self->lock)); assert(ck_spinlock_fas_locked(&self->lock));
@ -176,7 +174,6 @@ priority_queue_enqueue(struct priority_queue *self, void *value, char *name)
} }
int post_length = self->first_free - 1; int post_length = self->first_free - 1;
printf("[%s Enqueue] First Free: %d\n", name, self->first_free);
// We should have appended here // We should have appended here
assert(post_length == pre_length + 1); assert(post_length == pre_length + 1);
@ -217,7 +214,6 @@ priority_queue_delete(struct priority_queue *self, void *value, char *name)
printf("[priority_queue_delete] Not found!\n"); printf("[priority_queue_delete] Not found!\n");
return -1; return -1;
}; };
printf("[%s Delete] First Free: %d\n", name, self->first_free);
return 0; return 0;
} }
@ -259,23 +255,11 @@ priority_queue_dequeue(struct priority_queue *self, char *name)
self->highest_priority = !priority_queue_is_empty(self) ? self->get_priority(self->items[1]) self->highest_priority = !priority_queue_is_empty(self) ? self->get_priority(self->items[1])
: ULONG_MAX; : ULONG_MAX;
} }
printf("[%s Dequeue] First Free: %d\n", name, self->first_free);
ck_spinlock_fas_unlock(&self->lock); ck_spinlock_fas_unlock(&self->lock);
// End of Critical Section // End of Critical Section
return min; return min;
} }
// /**
// * Returns the head of the priority queue without removing it
// **/
// void *
// priority_queue_get_head(struct priority_queue *self)
// {
// ck_spinlock_fas_lock(&self->lock);
// ck_spinlock_fas_unlock(&self->lock);
// }
uint64_t uint64_t
priority_queue_peek(struct priority_queue *self) priority_queue_peek(struct priority_queue *self)
{ {

@ -142,16 +142,8 @@ done:
curr->total_time = __getcycles() - curr->start_time; curr->total_time = __getcycles() - curr->start_time;
uint64_t total_time_us = curr->total_time / runtime_processor_speed_MHz; uint64_t total_time_us = curr->total_time / runtime_processor_speed_MHz;
// TODO: Refactor to log file
printf("%s():%d, %d, %lu\n", curr->module->name, curr->module->port, curr->module->relative_deadline_us, printf("%s():%d, %d, %lu\n", curr->module->name, curr->module->port, curr->module->relative_deadline_us,
total_time_us); total_time_us);
// if (end_time < curr->absolute_deadline) {
// printf("meadDeadline Met with %f us to spare\n",
// (curr->absolute_deadline - end_time) / runtime_processor_speed_MHz);
// } else {
// printf("Deadline NOT MET! Overran by %f us\n",
// (end_time - curr->absolute_deadline) / runtime_processor_speed_MHz);
// }
#ifndef USE_HTTP_UVIO #ifndef USE_HTTP_UVIO
int r = send(curr->client_socket_descriptor, curr->request_response_data, sndsz, 0); int r = send(curr->client_socket_descriptor, curr->request_response_data, sndsz, 0);

@ -75,6 +75,14 @@ sandbox_run_queue_fifo_append(struct sandbox *sandbox_to_append)
return sandbox_to_append; return sandbox_to_append;
} }
// Conditionally checks to see if current sandbox should be preempted
// FIFO doesn't preempt, so just return.
void
sandbox_run_queue_fifo_preempt(ucontext_t *user_context)
{
return;
}
void void
sandbox_run_queue_fifo_initialize() sandbox_run_queue_fifo_initialize()
@ -85,7 +93,7 @@ sandbox_run_queue_fifo_initialize()
sandbox_run_queue_config_t config = { .add = sandbox_run_queue_fifo_append, sandbox_run_queue_config_t config = { .add = sandbox_run_queue_fifo_append,
.is_empty = sandbox_run_queue_fifo_is_empty, .is_empty = sandbox_run_queue_fifo_is_empty,
.delete = sandbox_run_queue_fifo_remove, .delete = sandbox_run_queue_fifo_remove,
.get_next = sandbox_run_queue_fifo_get_next }; .get_next = sandbox_run_queue_fifo_get_next,
.preempt = sandbox_run_queue_fifo_preempt };
sandbox_run_queue_initialize(&config); sandbox_run_queue_initialize(&config);
} };

@ -13,7 +13,6 @@ bool
sandbox_run_queue_ps_is_empty() sandbox_run_queue_ps_is_empty()
{ {
int length = priority_queue_length(&sandbox_run_queue_ps); int length = priority_queue_length(&sandbox_run_queue_ps);
if (length > 1) printf("[is_empty] Runqueue length: %d\n", length);
assert(length < 5); assert(length < 5);
return priority_queue_length(&sandbox_run_queue_ps) == 0; return priority_queue_length(&sandbox_run_queue_ps) == 0;
} }
@ -27,16 +26,14 @@ static struct sandbox *
sandbox_run_queue_ps_add(struct sandbox *sandbox) sandbox_run_queue_ps_add(struct sandbox *sandbox)
{ {
int original_length = priority_queue_length(&sandbox_run_queue_ps); int original_length = priority_queue_length(&sandbox_run_queue_ps);
if (original_length > 1) printf("[Add] Original Runqueue length: %d\n", original_length);
int return_code = priority_queue_enqueue(&sandbox_run_queue_ps, sandbox, "Run Queue"); int return_code = priority_queue_enqueue(&sandbox_run_queue_ps, sandbox, "Runqueue");
if (return_code == -1) { if (return_code == -1) {
printf("Thread Runqueue is full!\n"); printf("Thread Runqueue is full!\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int final_length = priority_queue_length(&sandbox_run_queue_ps); int final_length = priority_queue_length(&sandbox_run_queue_ps);
if (final_length > 1) printf("[Add] Final Runqueue length: %d\n", final_length);
assert(final_length == original_length + 1); assert(final_length == original_length + 1);
@ -47,7 +44,7 @@ sandbox_run_queue_ps_add(struct sandbox *sandbox)
/** /**
* *
* @returns A Sandbox Request or NULL * @returns A Sandbox or NULL
**/ **/
static struct sandbox * static struct sandbox *
sandbox_run_queue_ps_remove(void) sandbox_run_queue_ps_remove(void)
@ -56,7 +53,7 @@ sandbox_run_queue_ps_remove(void)
} }
/** /**
* @returns A Sandbox Request or NULL * @returns A Sandbox or NULL
**/ **/
static void static void
sandbox_run_queue_ps_delete(struct sandbox *sandbox) sandbox_run_queue_ps_delete(struct sandbox *sandbox)
@ -67,7 +64,8 @@ sandbox_run_queue_ps_delete(struct sandbox *sandbox)
} }
/** /**
* This function determines the next sandbox to run. This is either the head of the runqueue or the * This function determines the next sandbox to run. This is either the head of the runqueue or the head of the request
*queue
* *
* Execute the sandbox at the head of the thread local runqueue * Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head * If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
@ -76,23 +74,25 @@ sandbox_run_queue_ps_delete(struct sandbox *sandbox)
struct sandbox * struct sandbox *
sandbox_run_queue_ps_get_next() sandbox_run_queue_ps_get_next()
{ {
if (sandbox_run_queue_ps.first_free != 1) { if (sandbox_run_queue_is_empty()) {
printf("Runqueue First Free: %d\n", sandbox_run_queue_ps.first_free); // Try to pull a sandbox request and return NULL if we're unable to get one
sandbox_request_t *sandbox_request;
if ((sandbox_request = sandbox_request_scheduler_remove()) == NULL) {
// printf("Global Request Queue was empty!\n");
return NULL;
};
// Otherwise, allocate the sandbox request as a runnable sandbox and place on the runqueue
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
sandbox->state = RUNNABLE;
sandbox_run_queue_ps_add(sandbox);
return sandbox;
} }
// Try to pull a sandbox request and return NULL if we're unable to get one // Resume the sandbox at the top of the runqueue
sandbox_request_t *sandbox_request; struct sandbox *sandbox = sandbox_run_queue_ps_remove();
if ((sandbox_request = sandbox_request_scheduler_remove()) == NULL) {
// printf("Global Request Queue was empty!\n");
return NULL;
};
// Otherwise, allocate the sandbox request as a runnable sandbox and place on the runqueue
printf("Sandbox Runqueue was empty, so adding sandbox from global queue\n");
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
sandbox->state = RUNNABLE;
sandbox_run_queue_ps_add(sandbox); sandbox_run_queue_ps_add(sandbox);
return sandbox; return sandbox;
} }
@ -122,6 +122,7 @@ sandbox_run_queue_ps_preempt(ucontext_t *user_context)
uint64_t local_deadline = priority_queue_peek(&sandbox_run_queue_ps); uint64_t local_deadline = priority_queue_peek(&sandbox_run_queue_ps);
uint64_t global_deadline = sandbox_request_scheduler_peek(); uint64_t global_deadline = sandbox_request_scheduler_peek();
// Our local deadline should only be ULONG_MAX if our local runqueue is empty
if (local_deadline == ULONG_MAX) { assert(sandbox_run_queue_ps.first_free == 1); }; if (local_deadline == ULONG_MAX) { assert(sandbox_run_queue_ps.first_free == 1); };
// If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it // If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it

@ -41,7 +41,6 @@ extern pthread_t runtime_worker_threads[];
***************************************/ ***************************************/
static inline void software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void *user_context_raw); static inline void software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void *user_context_raw);
static inline void software_interrupt_schedule_alarm(void *user_context_raw);
/** /**
* The handler function for Software Interrupts (signals) * The handler function for Software Interrupts (signals)
@ -64,29 +63,34 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
case SIGALRM: { case SIGALRM: {
// SIGALRM is the preemption signal that occurs every quantum of execution // SIGALRM is the preemption signal that occurs every quantum of execution
// A POSIX signal is delivered to one of the threads in our process. If sent by the kernel, "broadcast"
// by forwarding to all all threads
if (signal_info->si_code == SI_KERNEL) { if (signal_info->si_code == SI_KERNEL) {
// deliver signal to all other worker threads..
for (int i = 0; i < WORKER_THREAD_CORE_COUNT; i++) { for (int i = 0; i < WORKER_THREAD_CORE_COUNT; i++) {
if (pthread_self() == runtime_worker_threads[i]) continue; if (pthread_self() == runtime_worker_threads[i]) continue;
pthread_kill(runtime_worker_threads[i], SIGALRM); pthread_kill(runtime_worker_threads[i], SIGALRM);
} }
} else { } else {
// What is this? // If not sent by the kernel, this should be a signal forwarded from another thread
assert(signal_info->si_code == SI_TKILL); assert(signal_info->si_code == SI_TKILL);
} }
// debuglog("alrm:%d\n", software_interrupt_SIGALRM_count); // debuglog("alrm:%d\n", software_interrupt_SIGALRM_count);
software_interrupt_SIGALRM_count++; software_interrupt_SIGALRM_count++;
// software_interrupt_supported_signals per-core..
// if the current sandbox is NULL or not in a RETURNED state
if (current_sandbox && current_sandbox->state == RETURNED) return; if (current_sandbox && current_sandbox->state == RETURNED) return;
// and the next context is NULL
if (worker_thread_next_context) return; if (worker_thread_next_context) return;
// and software interrupts are not disabled
if (!software_interrupt_is_enabled()) return; if (!software_interrupt_is_enabled()) return;
software_interrupt_schedule_alarm(user_context); // Preempt
sandbox_run_queue_preempt(user_context);
break; return;
} }
case SIGUSR1: { case SIGUSR1: {
// SIGUSR1 restores the preempted sandbox stored in worker_thread_next_context
// make sure sigalrm doesn't mess this up if nested.. // make sure sigalrm doesn't mess this up if nested..
assert(!software_interrupt_is_enabled()); assert(!software_interrupt_is_enabled());
/* we set current before calling pthread_kill! */ /* we set current before calling pthread_kill! */
@ -110,17 +114,6 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
#endif #endif
} }
/**
* Preempt the current sandbox and start executing the next sandbox
* @param user_context_raw void* to a user_context struct
**/
static inline void
software_interrupt_schedule_alarm(void *user_context_raw)
{
ucontext_t *user_context = (ucontext_t *)user_context_raw;
sandbox_run_queue_preempt(user_context);
}
/*************************************** /***************************************
* Public Functions * Public Functions
***************************************/ ***************************************/

@ -27,7 +27,7 @@
* Worker Thread State * * Worker Thread State *
**************************/ **************************/
// context pointer to switch to when this thread gets a SIGUSR1 // context pointer used to store and restore a preempted sandbox. SIGUSR1
__thread arch_context_t *worker_thread_next_context = NULL; __thread arch_context_t *worker_thread_next_context = NULL;
// context of the runtime thread before running sandboxes or to resume its "main". // context of the runtime thread before running sandboxes or to resume its "main".
@ -44,8 +44,8 @@ static __thread bool worker_thread_is_in_callback;
*************************************************/ *************************************************/
/** /**
* @brief Switches to the next sandbox, placing the current sandbox of the completion queue if in RETURNED state * @brief Switches to the next sandbox, placing the current sandbox on the completion queue if in RETURNED state
* @param next The Sandbox Context to switch to or NULL * @param next_sandbox The Sandbox Context to switch to or NULL
* @return void * @return void
*/ */
static inline void static inline void
@ -62,7 +62,8 @@ worker_thread_switch_to_sandbox(struct sandbox *next_sandbox)
// Set the current sandbox to the next // Set the current sandbox to the next
current_sandbox_set(next_sandbox); current_sandbox_set(next_sandbox);
// and switch to the associated context. But what is the purpose of worker_thread_next_context? // and switch to the associated context.
// Save the context pointer to worker_thread_next_context in case of preemption
worker_thread_next_context = next_register_context; worker_thread_next_context = next_register_context;
arch_context_switch(previous_register_context, next_register_context); arch_context_switch(previous_register_context, next_register_context);
@ -137,7 +138,8 @@ worker_thread_process_io(void)
} }
/** /**
* TODO: What is this doing? * Sends the current thread a SIGUSR1, causing a preempted sandbox to be restored
* Invoked by asm during a context switch
**/ **/
void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void) void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void)
{ {

@ -3,7 +3,7 @@
"name": "fibonacci", "name": "fibonacci",
"path": "fibonacci_wasm.so", "path": "fibonacci_wasm.so",
"port": 10000, "port": 10000,
"relative-deadline-us": 50000, "relative-deadline-us": 50000000,
"argsize": 1, "argsize": 1,
"http-req-headers": [], "http-req-headers": [],
"http-req-content-type": "text/plain", "http-req-content-type": "text/plain",
@ -17,7 +17,7 @@
"name": "fibonacci2", "name": "fibonacci2",
"path": "fibonacci_wasm.so", "path": "fibonacci_wasm.so",
"port": 10001, "port": 10001,
"relative-deadline-us": 10000, "relative-deadline-us": 50000,
"argsize": 1, "argsize": 1,
"http-req-headers": [], "http-req-headers": [],
"http-req-content-type": "text/plain", "http-req-content-type": "text/plain",

Loading…
Cancel
Save