implement srsf

main
xiaosuGW 3 years ago
parent 60026160a8
commit da7f5506d6

@ -13,3 +13,6 @@ struct admissions_info {
void admissions_info_initialize(struct admissions_info *self, char* module_name, int percentile, uint64_t expected_execution,
uint64_t relative_deadline);
void admissions_info_update(struct admissions_info *self, uint64_t execution_duration);
uint64_t admission_info_get_percentile(struct admissions_info *self);

@ -105,6 +105,15 @@ sandbox_get_priority(void *element)
return sandbox->absolute_deadline;
};
static inline uint64_t
sandbox_get_srsf_priority(void *element)
{
struct sandbox *sandbox = (struct sandbox *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack = sandbox->remaining_slack - (now - sandbox->last_update_timestamp);
return remaining_slack;
};
/**
* Maps a sandbox fd to an underlying host fd
* Returns error condition if the file_descriptor to set does not contain sandbox preopen magic

@ -23,6 +23,8 @@ struct sandbox_request {
uint64_t request_arrival_timestamp; /* cycles */
uint64_t enqueue_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */
uint64_t last_update_timestamp; /* cycles */
uint64_t remaining_slack; /* cycles */
char * previous_function_output;
ssize_t output_length;
ssize_t previous_request_length;
@ -71,7 +73,7 @@ sandbox_request_log_allocation(struct sandbox_request *sandbox_request)
static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, bool request_from_outside, ssize_t request_length,
char *arguments, int socket_descriptor, const struct sockaddr *socket_address,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp, uint64_t remaining_slack,
uint64_t admissions_estimate, char *previous_function_output, ssize_t output_length)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
@ -91,6 +93,8 @@ sandbox_request_allocate(struct module *module, bool request_from_outside, ssize
sandbox_request->previous_function_output = previous_function_output;
sandbox_request->output_length = output_length;
sandbox_request->previous_request_length = request_length;
sandbox_request->last_update_timestamp = enqueue_timestamp;
sandbox_request->remaining_slack = remaining_slack;
/*
* Admissions Control State

@ -28,6 +28,7 @@ sandbox_set_as_blocked(struct sandbox *sandbox, sandbox_state_t last_state)
switch (last_state) {
case SANDBOX_RUNNING: {
sandbox->last_update_timestamp = now;
sandbox->running_duration += duration_of_last_state;
local_runqueue_delete(sandbox);
break;

@ -41,6 +41,8 @@ sandbox_set_as_runnable(struct sandbox *sandbox, sandbox_state_t last_state)
break;
}
case SANDBOX_RUNNING: {
/* stop running the current sandbox, so record the timestamp */
sandbox->last_update_timestamp = now;
sandbox->running_duration += duration_of_last_state;
/* No need to add to runqueue, as already on it */
break;

@ -19,6 +19,8 @@ sandbox_set_as_running(struct sandbox *sandbox, sandbox_state_t last_state)
switch (last_state) {
case SANDBOX_RUNNABLE: {
sandbox->remaining_slack -= now - sandbox->last_update_timestamp;
sandbox->last_update_timestamp = now;
sandbox->runnable_duration += duration_of_last_state;
current_sandbox_set(sandbox);
runtime_worker_threads_deadline[worker_thread_idx] = sandbox->absolute_deadline;

@ -49,11 +49,13 @@ struct sandbox {
struct arch_context ctxt; /* register context for context switch. */
uint64_t request_arrival_timestamp; /* Timestamp when request is received */
uint64_t enqueue_timestamp; /* Timestamp when sandbox request is enqueued to the global queue*/
uint64_t enqueue_timestamp; /* Timestamp when sandbox request is enqueued to the global queue*/
uint64_t allocation_timestamp; /* Timestamp when sandbox is allocated */
uint64_t response_timestamp; /* Timestamp when response is sent */
uint64_t completion_timestamp; /* Timestamp when sandbox runs to completion */
uint64_t last_state_change_timestamp; /* Used for bookkeeping of actual execution time */
uint64_t last_update_timestamp; /* Used for bookkeeping timestamp for SRSF */
uint64_t remaining_slack; /* Cycles */
#ifdef LOG_SANDBOX_MEMORY_PROFILE
uint32_t page_allocation_timestamps[SANDBOX_PAGE_ALLOCATION_TIMESTAMP_COUNT];
size_t page_allocation_timestamps_size;

@ -25,7 +25,8 @@
enum SCHEDULER
{
SCHEDULER_FIFO = 0,
SCHEDULER_EDF = 1
SCHEDULER_EDF = 1,
SCHEDULER_SRSF = 2
};
extern enum SCHEDULER scheduler;
@ -64,6 +65,38 @@ err_allocate:
goto done;
}
static inline struct sandbox *
scheduler_srsf_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox * local = local_runqueue_get_next();
uint64_t local_remaining_slack = local == NULL ? UINT64_MAX : local->remaining_slack;
struct sandbox_request *request = NULL;
uint64_t global_remaining_slack = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_remaining_slack < local_remaining_slack) {
if (global_request_scheduler_remove_if_earlier(&request, local_remaining_slack) == 0) {
assert(request != NULL);
struct sandbox *global = sandbox_allocate(request);
if (!global) goto err_allocate;
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
done:
return local_runqueue_get_next();
err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
goto done;
}
static inline struct sandbox *
scheduler_fifo_get_next()
{
@ -103,6 +136,8 @@ scheduler_get_next()
switch (scheduler) {
case SCHEDULER_EDF:
return scheduler_edf_get_next();
case SCHEDULER_SRSF:
return scheduler_srsf_get_next();
case SCHEDULER_FIFO:
return scheduler_fifo_get_next();
default:
@ -115,7 +150,10 @@ scheduler_initialize()
{
switch (scheduler) {
case SCHEDULER_EDF:
global_request_scheduler_minheap_initialize();
global_request_scheduler_minheap_initialize(SCHEDULER_EDF);
break;
case SCHEDULER_SRSF:
global_request_scheduler_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_FIFO:
global_request_scheduler_deque_initialize();
@ -130,7 +168,10 @@ scheduler_runqueue_initialize()
{
switch (scheduler) {
case SCHEDULER_EDF:
local_runqueue_minheap_initialize();
local_runqueue_minheap_initialize(SCHEDULER_EDF);
break;
case SCHEDULER_SRSF:
local_runqueue_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_FIFO:
local_runqueue_list_initialize();
@ -216,6 +257,8 @@ scheduler_print(enum SCHEDULER variant)
return "FIFO";
case SCHEDULER_EDF:
return "EDF";
case SCHEDULER_SRSF:
return "SRSF";
}
}

@ -32,7 +32,17 @@ admissions_info_initialize(struct admissions_info *self, char* module_name, int
#endif
}
/*
* Get the specified execution time of this module, no lock for accessing the queue
* @param self
* @returns the specified execution time of this module
*/
uint64_t
admission_info_get_percentile(struct admissions_info *self)
{
uint64_t estimated_execution = perf_window_get_percentile(&self->perf_window, self->percentile, self->control_index);
return estimated_execution;
}
/*
* Adds an execution value to the perf window and calculates and caches and updated estimate
* @param self

@ -126,7 +126,7 @@ current_sandbox_start(void)
next_module->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, enqueue_timestamp,
true, pre_func_output, output_length);
sandbox->remaining_slack, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/

@ -6,6 +6,7 @@
#include "panic.h"
#include "priority_queue.h"
#include "runtime.h"
#include "scheduler.h"
static struct priority_queue *global_request_scheduler_minheap;
@ -70,14 +71,26 @@ sandbox_request_get_priority_fn(void *element)
return sandbox_request->absolute_deadline;
};
uint64_t
sandbox_request_get_priority_srsf_fn(void *element)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack = sandbox_request->remaining_slack - (now - sandbox_request->last_update_timestamp);
return remaining_slack;
};
/**
* Initializes the variant and registers against the polymorphic interface
*/
void
global_request_scheduler_minheap_initialize()
global_request_scheduler_minheap_initialize(enum SCHEDULER scheduler)
{
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_request_get_priority_fn);
if (scheduler == SCHEDULER_EDF) {
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_request_get_priority_fn);
} else if (scheduler == SCHEDULER_SRSF) {
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_request_get_priority_srsf_fn);
}
struct global_request_scheduler_config config = {
.add_fn = global_request_scheduler_minheap_add,

@ -174,12 +174,21 @@ listener_thread_main(void *dummy)
continue;
}
/* get total estimated execution time */
uint64_t estimated_execution_time = admission_info_get_percentile(&module->admissions_info);
struct module * next_module = module->next_module;
while(next_module) {
estimated_execution_time += admission_info_get_percentile(&next_module->admissions_info);
next_module = next_module->next_module;
}
uint64_t remaining_slack = module->relative_deadline - estimated_execution_time;
/* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, true, 0, module->name, client_socket,
(const struct sockaddr *)&client_address,
request_arrival_timestamp, request_arrival_timestamp,
request_arrival_timestamp, request_arrival_timestamp,remaining_slack,
work_admitted, NULL, 0);
/* Add to the Global Sandbox Request Scheduler */

@ -1,5 +1,6 @@
#include <stdint.h>
#include "scheduler.h"
#include "arch/context.h"
#include "client_socket.h"
#include "current_sandbox.h"
@ -74,10 +75,14 @@ local_runqueue_minheap_get_next()
* Registers the PS variant with the polymorphic interface
*/
void
local_runqueue_minheap_initialize()
local_runqueue_minheap_initialize(enum SCHEDULER scheduler)
{
/* Initialize local state */
local_runqueue_minheap = priority_queue_initialize(256, false, sandbox_get_priority);
if (scheduler == SCHEDULER_EDF) {
local_runqueue_minheap = priority_queue_initialize(256, false, sandbox_get_priority);
} else if (scheduler == SCHEDULER_SRSF) {
local_runqueue_minheap = priority_queue_initialize(256, false, sandbox_get_srsf_priority);
}
/* Register Function Pointers for Abstract Scheduling API */
struct local_runqueue_config config = { .add_fn = local_runqueue_minheap_add,

Loading…
Cancel
Save