feat: Implement peek in ps

main
Sean McBride 5 years ago
parent eef45c5983
commit 3caecadefe

@ -5,6 +5,7 @@
#include <string.h>
#include <ucontext.h>
#include <unistd.h>
#include <stdbool.h>
#define ARCH_NREGS (16 /* GP registers */ + 1 /* for IP */)
#define ARCH_SIG_JMP_OFF 8
@ -74,27 +75,38 @@ static void __attribute__((noinline)) arch_context_init(arch_context_t *actx, re
*(actx->regs + 16) = ip;
}
/**
* Preempt the current sandbox and start executing the next sandbox
* @param mc - the context of the current thread of execution
* @param ctx - the context that we want to restore
* @return {0,1} 0 = context restored successfully. 1 = special processing because thread was last in a user-level
* context switch state
**/
static int
arch_mcontext_restore(mcontext_t *mc, arch_context_t *ctx)
{
assert(ctx != &worker_thread_base_context);
// if ctx->regs[5] is set, this was last in a user-level context switch state!
// else restore mcontext..
if (ctx->regs[5]) {
// if ctx->regs[5] is set, this was last in a user-level context switch state
bool did_user_level_context_switch = ctx->regs[5];
if (did_user_level_context_switch) {
mc->gregs[REG_RSP] = ctx->regs[5];
mc->gregs[REG_RIP] = ctx->regs[16] + ARCH_SIG_JMP_OFF;
ctx->regs[5] = 0;
return 1;
} else {
// Restore mcontext
memcpy(mc, &ctx->mctx, sizeof(mcontext_t));
memset(&ctx->mctx, 0, sizeof(mcontext_t));
}
return 0;
}
}
/**
* Save the context of the currently executing process
* @param ctx - destination
* @param mc - source
**/
static void
arch_mcontext_save(arch_context_t *ctx, mcontext_t *mc)
{
@ -104,27 +116,26 @@ arch_mcontext_save(arch_context_t *ctx, mcontext_t *mc)
memcpy(&ctx->mctx, mc, sizeof(mcontext_t));
}
/**
* @param current - the registers and context of the thing running
* @param next - the registers and context of what we're switching to
* @return always returns 0, indicating success
*
* NULL in either of these values indicates the "no sandbox to execute" state,
* which defaults to resuming execution of main
**/
static inline int
arch_context_switch(arch_context_t *ca, arch_context_t *na)
arch_context_switch(arch_context_t *current, arch_context_t *next)
{
if (!ca) {
assert(na);
// switching from "no sandbox to execute" state to "executing a sandbox"
ca = &worker_thread_base_context;
} else if (!na) {
assert(ca);
// switching from "executing a sandbox" to "no execution" state.
na = &worker_thread_base_context;
} else {
assert(na && ca);
// if both current and next are NULL, there is no state change
assert(current != NULL || next != NULL);
// switching between sandboxes.
}
// Set any NULLs to worker_thread_base_context to resume execution of main
if (current == NULL) current = &worker_thread_base_context;
if (next == NULL) next = &worker_thread_base_context;
reg_t *cr = ca->regs, *nr = na->regs;
assert(cr && nr);
reg_t *current_registers = current->regs, *next_registers = next->regs;
assert(current_registers && next_registers);
asm volatile("pushq %%rbp\n\t"
"movq %%rsp, %%rbp\n\t"
@ -143,7 +154,7 @@ arch_context_switch(arch_context_t *ca, arch_context_t *na)
"3:\n\t"
"popq %%rbp\n\t"
:
: "a"(cr), "b"(nr)
: "a"(current_registers), "b"(next_registers)
: "memory", "cc", "rcx", "rdx", "rsi", "rdi", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15",
"xmm0", "xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7", "xmm8", "xmm9", "xmm10", "xmm11",
"xmm12", "xmm13", "xmm14", "xmm15");

@ -88,7 +88,6 @@ extern __thread arch_context_t *worker_thread_next_context;
extern void worker_thread_block_current_sandbox(void);
extern void worker_thread_exit_current_sandbox(void);
extern struct sandbox *worker_thread_get_next_sandbox(void);
extern void worker_thread_process_io(void);
extern void __attribute__((noreturn)) worker_thread_sandbox_switch_preempt(void);
extern void worker_thread_wakeup_sandbox(sandbox_t *sandbox);

@ -9,12 +9,14 @@ typedef struct sandbox *(*sandbox_run_queue_add_t)(struct sandbox *);
typedef struct sandbox *(*sandbox_run_queue_remove_t)(void);
typedef bool (*sandbox_run_queue_is_empty_t)(void);
typedef void (*sandbox_run_queue_delete_t)(struct sandbox *sandbox);
typedef struct sandbox *(*sandbox_run_queue_get_next_t)();
typedef struct sandbox_run_queue_config_t {
sandbox_run_queue_add_t add;
sandbox_run_queue_is_empty_t is_empty;
sandbox_run_queue_remove_t remove;
sandbox_run_queue_delete_t delete;
sandbox_run_queue_get_next_t get_next;
} sandbox_run_queue_config_t;
@ -24,5 +26,7 @@ struct sandbox *sandbox_run_queue_add(struct sandbox *);
void sandbox_run_queue_delete(struct sandbox *);
struct sandbox *sandbox_run_queue_remove();
bool sandbox_run_queue_is_empty();
struct sandbox *sandbox_run_queue_get_next();
#endif /* SFRT_SANDBOX_RUN_QUEUE_H */

@ -10,6 +10,7 @@
***************************************/
extern __thread volatile sig_atomic_t software_interrupt_is_disabled;
extern uint64_t SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES;
/***************************************
* Public Static Inlines

@ -198,6 +198,8 @@ main(int argc, char **argv)
memset(runtime_worker_threads, 0, sizeof(pthread_t) * WORKER_THREAD_CORE_COUNT);
runtime_processor_speed_MHz = runtime_get_processor_speed_MHz();
SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES = (uint64_t)SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_USEC
* runtime_processor_speed_MHz;
printf("Detected processor speed of %f MHz\n", runtime_processor_speed_MHz);
runtime_set_resource_limits_to_max();

@ -44,8 +44,8 @@ runtime_initialize(void)
assert(runtime_epoll_file_descriptor >= 0);
// Allocate and Initialize the global deque
// sandbox_request_scheduler_fifo_initialize();
sandbox_request_scheduler_ps_initialize();
sandbox_request_scheduler_fifo_initialize();
// sandbox_request_scheduler_ps_initialize();
// Mask Signals
software_interrupt_mask_signal(SIGUSR1);

@ -191,7 +191,7 @@ current_sandbox_main(void)
// FIXME: is this right? this is the first time this sandbox is running.. so it wont
// return to worker_thread_switch_to_sandbox() api..
// we'd potentially do what we'd in worker_thread_switch_to_sandbox() api here for cleanup..
if (!software_interrupt_is_enabled()) {
if (software_interrupt_is_enabled() == false) {
arch_context_init(&current_sandbox->ctxt, 0, 0);
worker_thread_next_context = NULL;
software_interrupt_enable();

@ -26,3 +26,10 @@ sandbox_request_scheduler_remove()
assert(sandbox_request_scheduler.remove != NULL);
return sandbox_request_scheduler.remove();
}
uint64_t
sandbox_request_scheduler_peek()
{
assert(sandbox_request_scheduler.peek != NULL);
return sandbox_request_scheduler.peek();
};

@ -38,3 +38,10 @@ sandbox_run_queue_is_empty()
{
return sandbox_run_queue_is_empty();
}
struct sandbox *
sandbox_run_queue_get_next()
{
assert(sandbox_run_queue.get_next != NULL);
return sandbox_run_queue.get_next();
};

@ -1,5 +1,6 @@
#include "sandbox_run_queue_fifo.h"
#include "sandbox_run_queue.h"
#include "sandbox_request_scheduler.h"
__thread static struct ps_list_head sandbox_run_queue_fifo;
@ -26,6 +27,35 @@ sandbox_run_queue_fifo_remove(struct sandbox *sandbox_to_remove)
ps_list_rem_d(sandbox_to_remove);
}
/**
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @return the sandbox to execute or NULL if none are available
**/
struct sandbox *
sandbox_run_queue_fifo_get_next()
{
if (sandbox_run_queue_is_empty()) {
sandbox_request_t *sandbox_request = sandbox_request_scheduler_remove();
if (sandbox_request == NULL) return NULL;
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
return sandbox;
}
// Execute Round Robin Scheduling Logic
struct sandbox *next_sandbox = sandbox_run_queue_remove();
assert(next_sandbox->state != RETURNED);
sandbox_run_queue_add(next_sandbox);
debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name);
return next_sandbox;
}
// Append a sandbox to the runqueue
struct sandbox *
sandbox_run_queue_fifo_append(struct sandbox *sandbox_to_append)
@ -55,7 +85,8 @@ sandbox_run_queue_fifo_initialize()
sandbox_run_queue_config_t config = { .add = sandbox_run_queue_fifo_append,
.is_empty = sandbox_run_queue_fifo_is_empty,
.remove = sandbox_run_queue_fifo_remove_and_return,
.delete = sandbox_run_queue_fifo_remove };
.delete = sandbox_run_queue_fifo_remove,
.get_next = sandbox_run_queue_fifo_get_next };
sandbox_run_queue_initialize(&config);
}

@ -1,6 +1,7 @@
#include "sandbox_run_queue_ps.h"
#include "sandbox_run_queue.h"
#include "priority_queue.h"
#include "sandbox_request_scheduler.h"
// Local State
__thread static struct priority_queue sandbox_run_queue_ps;
@ -46,6 +47,61 @@ sandbox_run_queue_ps_delete(struct sandbox *sandbox)
assert(rc != -2);
}
/**
* This function determines the next sandbox to run. This is either the head of the runqueue or the
*
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @return the sandbox to execute or NULL if none are available
**/
struct sandbox *
sandbox_run_queue_ps_get_next()
{
// At any point, we may need to run the head of the request scheduler, the head of the local runqueue, or we
// might want to continue executing the current sandbox. If we want to keep executing the current sandbox, we
// should have a fast path to be able to resume without context switches.
// If the run queue is empty, we've run the current sandbox to completion
// We assume that the current sandbox is always on the runqueue when in a runnable state, we know it's the
// highest priority thing on the runqueue.
// Case 1: Current runqueue is empty, so pull from global queue and add to runqueue
if (sandbox_run_queue_is_empty()) {
sandbox_request_t *sandbox_request = sandbox_request_scheduler_remove();
if (sandbox_request == NULL) return NULL;
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
return sandbox;
}
// Case 2: Current runqueue is not empty, so compare head of runqueue to head of global request queue and return
// highest priority
uint64_t global_deadline = sandbox_request_scheduler_peek() - SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES;
// This should be refactored to peek at the top of the runqueue
struct sandbox *head_of_runqueue = sandbox_run_queue_remove();
uint64_t local_deadline = head_of_runqueue->absolute_deadline;
sandbox_run_queue_add(head_of_runqueue);
if (local_deadline <= global_deadline) {
return head_of_runqueue;
} else {
sandbox_request_t *sandbox_request = sandbox_request_scheduler_remove();
struct sandbox * sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
debuglog("[%p: %s]\n", sandbox, sandbox->module->name);
return sandbox;
}
}
uint64_t
sandbox_get_priority(void *element)
{
@ -63,7 +119,8 @@ sandbox_run_queue_ps_initialize()
sandbox_run_queue_config_t config = { .add = sandbox_run_queue_ps_add,
.is_empty = sandbox_run_queue_ps_is_empty,
.remove = sandbox_run_queue_ps_remove,
.delete = sandbox_run_queue_ps_delete };
.delete = sandbox_run_queue_ps_delete,
.get_next = sandbox_run_queue_ps_get_next };
sandbox_run_queue_initialize(&config);
}

@ -13,12 +13,14 @@
#include <arch/context.h>
#include <software_interrupt.h>
#include <current_sandbox.h>
#include "sandbox_run_queue.h"
/***************************************
* Process Globals
***************************************/
static const int software_interrupt_supported_signals[] = { SIGALRM, SIGUSR1 };
uint64_t SOFTWARE_INTERRUPT_INTERVAL_DURATION_IN_CYCLES;
/***************************************
* Thread Globals
@ -116,29 +118,31 @@ static inline void
software_interrupt_schedule_alarm(void *user_context_raw)
{
software_interrupt_disable(); // no nesting!
struct sandbox *current_sandbox = current_sandbox_get();
bool should_enable_software_interrupt = true;
// If current_sandbox is null, there's nothing to preempt, so let the "main" scheduler run its course.
if (current_sandbox != NULL) {
// find a next sandbox to run..
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
struct sandbox *next_sandbox = sandbox_run_queue_get_next();
if (next_sandbox != NULL && next_sandbox != current_sandbox) {
ucontext_t *user_context = (ucontext_t *)user_context_raw;
// Save context to the sandbox we're switching from
// Save the context of the currently executing sandbox before switching from it
arch_mcontext_save(&current_sandbox->ctxt, &user_context->uc_mcontext);
// current_sandbox_set on it. restore through *user_context..
// Update current_sandbox to the next sandbox
current_sandbox_set(next_sandbox);
if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt)) goto skip;
// And load the context of this new sandbox
// RC of 1 indicates that sandbox was last in a user-level context switch state,
// so do not enable software interrupts.
if (arch_mcontext_restore(&user_context->uc_mcontext, &next_sandbox->ctxt) == 1)
should_enable_software_interrupt = false;
}
}
software_interrupt_enable();
skip:
return;
if (should_enable_software_interrupt) software_interrupt_enable();
}
/***************************************

@ -106,7 +106,7 @@ worker_thread_block_current_sandbox(void)
previous_sandbox->state = BLOCKED;
// Switch to the next sandbox
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
struct sandbox *next_sandbox = sandbox_run_queue_get_next();
debuglog("[%p: %next_sandbox, %p: %next_sandbox]\n", previous_sandbox, previous_sandbox->module->name,
next_sandbox, next_sandbox ? next_sandbox->module->name : "");
software_interrupt_enable();
@ -147,32 +147,6 @@ void __attribute__((noinline)) __attribute__((noreturn)) worker_thread_sandbox_s
;
}
/**
* Pulls up to 1..n sandbox requests, allocates them as sandboxes, sets them as runnable and places them on the local
* runqueue, and then frees the sandbox requests The batch size pulled at once is set by SANDBOX_PULL_BATCH_SIZE
* @return the number of sandbox requests pulled
*/
static inline int
worker_thread_pull_and_process_sandbox_requests(void)
{
int total_sandboxes_pulled = 0;
while (total_sandboxes_pulled < SANDBOX_PULL_BATCH_SIZE) {
sandbox_request_t *sandbox_request;
if ((sandbox_request = sandbox_request_scheduler_remove()) == NULL) break;
// Actually allocate the sandbox for the requests that we've pulled
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
assert(sandbox);
free(sandbox_request);
// Set the sandbox as runnable and place on the local runqueue
sandbox->state = RUNNABLE;
sandbox_run_queue_add(sandbox);
total_sandboxes_pulled++;
}
return total_sandboxes_pulled;
}
/**
* Run all outstanding events in the local thread's libuv event loop
**/
@ -188,28 +162,6 @@ worker_thread_execute_libuv_event_loop(void)
worker_thread_is_in_callback = false;
}
/**
* Execute the sandbox at the head of the thread local runqueue
* If the runqueue is empty, pull a fresh batch of sandbox requests, instantiate them, and then execute the new head
* @return the sandbox to execute or NULL if none are available
**/
struct sandbox *
worker_thread_get_next_sandbox()
{
if (sandbox_run_queue_is_empty()) {
int sandboxes_pulled = worker_thread_pull_and_process_sandbox_requests();
if (sandboxes_pulled == 0) return NULL;
}
// Execute Round Robin Scheduling Logic
struct sandbox *next_sandbox = sandbox_run_queue_remove();
assert(next_sandbox->state != RETURNED);
sandbox_run_queue_add(next_sandbox);
debuglog("[%p: %s]\n", next_sandbox, next_sandbox->module->name);
return next_sandbox;
}
/**
* The entry function for sandbox worker threads
* Initializes thread-local state, unmasks signals, sets up libuv loop and
@ -221,8 +173,8 @@ worker_thread_main(void *return_code)
// Initialize Worker State
arch_context_init(&worker_thread_base_context, 0, 0);
// sandbox_run_queue_fifo_initialize();
sandbox_run_queue_ps_initialize();
sandbox_run_queue_fifo_initialize();
// sandbox_run_queue_ps_initialize();
sandbox_completion_queue_initialize();
software_interrupt_is_disabled = false;
@ -243,7 +195,7 @@ worker_thread_main(void *return_code)
if (!worker_thread_is_in_callback) worker_thread_execute_libuv_event_loop();
software_interrupt_disable();
next_sandbox = worker_thread_get_next_sandbox();
next_sandbox = sandbox_run_queue_get_next();
software_interrupt_enable();
if (next_sandbox != NULL) {
@ -272,7 +224,7 @@ worker_thread_exit_current_sandbox(void)
sandbox_run_queue_delete(previous_sandbox);
previous_sandbox->state = RETURNED;
struct sandbox *next_sandbox = worker_thread_get_next_sandbox();
struct sandbox *next_sandbox = sandbox_run_queue_get_next();
assert(next_sandbox != previous_sandbox);
software_interrupt_enable();
// Because the stack is still in use, only unmap linear memory and defer free resources until "main

Loading…
Cancel
Save