Compare commits

..

2 Commits

@ -112,7 +112,8 @@
"condition_variable": "c",
"ostream": "c",
"stop_token": "c",
"dag_data_split.h": "c"
"dag_data_split.h": "c",
"scheduler.h": "c"
},
"files.exclude": {
"**/.git": true,

@ -0,0 +1,46 @@
Summary:
Total: 30.0061 secs
Slowest: 0.1721 secs
Fastest: 0.0035 secs
Average: 0.0088 secs
Requests/sec: 227.3541
Total data: 327456 bytes
Size/request: 48 bytes
Response time histogram:
0.004 [1] |
0.020 [6695] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.037 [114] |■
0.054 [4] |
0.071 [0] |
0.088 [5] |
0.105 [1] |
0.122 [1] |
0.138 [0] |
0.155 [0] |
0.172 [1] |
Latency distribution:
10% in 0.0059 secs
25% in 0.0068 secs
50% in 0.0079 secs
75% in 0.0095 secs
90% in 0.0119 secs
95% in 0.0143 secs
99% in 0.0237 secs
Details (average, fastest, slowest):
DNS+dialup: 0.0004 secs, 0.0035 secs, 0.1721 secs
DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
req write: 0.0001 secs, 0.0000 secs, 0.0067 secs
resp wait: 0.0081 secs, 0.0031 secs, 0.1715 secs
resp read: 0.0002 secs, 0.0000 secs, 0.0049 secs
Status code distribution:
[200] 6822 responses

@ -15,4 +15,5 @@ void admissions_info_initialize(struct admissions_info *self, char* module_name,
void admissions_info_update(struct admissions_info *self, uint64_t execution_duration);
uint64_t admission_info_get_percentile(struct admissions_info *self);
uint64_t admission_info_get_average(struct admissions_info *self);

@ -83,8 +83,8 @@ struct module {
char **next_module_names; /* the next modules name in the DAG */
uint32_t next_module_count;
uint32_t pre_module_count;
bool runtime_visited;
uint32_t run_priority;
bool runtime_visited; /* used for calculating the estimated time */
uint32_t run_priority;/* Used for prioritizing data fan-in to a node */
};
/*************************

@ -24,6 +24,7 @@ perf_window_initialize(struct perf_window *self, char* module_name)
self->count = 0;
memset(&self->by_duration, 0, sizeof(struct execution_node) * PERF_WINDOW_BUFFER_SIZE);
memset(&self->by_termination, 0, sizeof(uint16_t) * PERF_WINDOW_BUFFER_SIZE);
memset(&self->by_duration_for_mdl, 0, sizeof(uint64_t) * PERF_WINDOW_BUFFER_SIZE);
}
@ -90,12 +91,14 @@ perf_window_add(struct perf_window *self, uint64_t value)
self->by_termination[i] = i;
self->by_duration[i] = (struct execution_node){ .execution_time = value,
.by_termination_idx = i };
self->by_duration_for_mdl[i] = value;
}
self->count = PERF_WINDOW_BUFFER_SIZE;
goto done;
}
/* Otherwise, replace the oldest value, and then sort */
self->by_duration_for_mdl[self->count % PERF_WINDOW_BUFFER_SIZE] = value;
uint16_t idx_of_oldest = self->by_termination[self->count % PERF_WINDOW_BUFFER_SIZE];
bool check_up = value > self->by_duration[idx_of_oldest].execution_time;
@ -154,6 +157,31 @@ perf_window_get_percentile(struct perf_window *self, int percentile, int precomp
return self->by_duration[size * percentile / 100].execution_time;
}
static inline uint64_t
perf_window_get_average(struct perf_window *self)
{
assert(self != NULL);
int size = self->count;
if (size == 0) {
return 0;
} else if (size < PERF_WINDOW_BUFFER_SIZE) {
uint64_t average = 0;
for (size_t i = 0; i < size; i++)
{
average += self->by_duration_for_mdl[i];
}
average = average / size;
return average;
} else{
uint64_t average = 0;
for (size_t i = 0; i < PERF_WINDOW_BUFFER_SIZE; i++) {
average += self->by_duration_for_mdl[i];
}
average /= PERF_WINDOW_BUFFER_SIZE;
return average;
}
}
/**
* Returns the total count of executions
* @returns total count

@ -26,6 +26,7 @@ struct execution_node {
struct perf_window {
char name[32];
struct execution_node by_duration[PERF_WINDOW_BUFFER_SIZE];
uint64_t by_duration_for_mdl[PERF_WINDOW_BUFFER_SIZE];
uint16_t by_termination[PERF_WINDOW_BUFFER_SIZE];
uint64_t count;
lock_t lock;

@ -115,6 +115,15 @@ sandbox_get_srsf_priority(void *element)
return remaining_slack;
};
static inline uint64_t
sandbox_get_mdl_priority(void *element)
{
struct sandbox *sandbox = (struct sandbox *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack_mdl = sandbox->remaining_slack - (now - sandbox->last_update_timestamp);
return remaining_slack_mdl;
};
/**
* Maps a sandbox fd to an underlying host fd
* Returns error condition if the file_descriptor to set does not contain sandbox preopen magic

@ -28,7 +28,8 @@ enum SCHEDULER
{
SCHEDULER_FIFO = 0,
SCHEDULER_EDF = 1,
SCHEDULER_SRSF = 2
SCHEDULER_SRSF = 2,
SCHEDULER_MDL = 3
};
extern enum SCHEDULER scheduler;
@ -112,6 +113,47 @@ err_allocate:
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_MDL_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox * local = local_runqueue_get_next();
uint64_t local_remaining_MDL = local == NULL ? UINT64_MAX : local->remaining_slack;
struct sandbox_request *request = NULL;
uint64_t global_remaining_slack = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_remaining_slack < local_remaining_MDL && (local_workload_count <=2 || local_runqueue_count == 0)) {
//if (global_remaining_slack < local_remaining_slack) {
if (global_request_scheduler_remove_if_earlier(&request, local_remaining_MDL) == 0) {
//uint64_t pop_time = __getcycles() - system_start_timestamp;
//mem_log("time %lu remove from GQ, request id:%d name %s remaining slack %lu\n", pop_time,
// request->id, request->module->name, request->remaining_slack);
assert(request != NULL);
struct sandbox *global = sandbox_allocate(request);
if (!global) goto err_allocate;
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
done:
return local_runqueue_get_next();
err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_fifo_get_next()
{
@ -156,6 +198,8 @@ scheduler_get_next()
return scheduler_srsf_get_next();
case SCHEDULER_FIFO:
return scheduler_fifo_get_next();
case SCHEDULER_MDL:
return scheduler_MDL_get_next();
default:
panic("Unimplemented\n");
}
@ -171,6 +215,9 @@ scheduler_initialize()
case SCHEDULER_SRSF:
global_request_scheduler_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_MDL:
global_request_scheduler_minheap_initialize(SCHEDULER_MDL);
break;
case SCHEDULER_FIFO:
global_request_scheduler_deque_initialize();
break;
@ -189,6 +236,8 @@ scheduler_runqueue_initialize()
case SCHEDULER_SRSF:
local_runqueue_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_MDL:
local_runqueue_minheap_initialize(SCHEDULER_MDL);
case SCHEDULER_FIFO:
local_runqueue_list_initialize();
break;
@ -291,6 +340,8 @@ scheduler_print(enum SCHEDULER variant)
return "FIFO";
case SCHEDULER_EDF:
return "EDF";
case SCHEDULER_MDL:
return "MDL";
case SCHEDULER_SRSF:
return "SRSF";
}

@ -42,6 +42,19 @@ admission_info_get_percentile(struct admissions_info *self)
uint64_t estimated_execution = perf_window_get_percentile(&self->perf_window, self->percentile, self->control_index);
return estimated_execution;
}
/*
* Get the average execution time of this module, no lock for accessing the queue
* @param self
* @returns the specified execution time of this module
*/
uint64_t
admission_info_get_average(struct admissions_info *self)
{
uint64_t estimated_execution = perf_window_get_average(&self->perf_window);
return estimated_execution;
}
/*
* Adds an execution value to the perf window and calculates and caches and updated estimate
* @param self

@ -80,6 +80,15 @@ sandbox_request_get_priority_srsf_fn(void *element)
return remaining_slack;
};
uint64_t
sandbox_request_get_priority_mdl_fn(void *element)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack_mdl = sandbox_request->remaining_slack - (now - sandbox_request->last_update_timestamp);
return remaining_slack_mdl;
};
/**
* Initializes the variant and registers against the polymorphic interface
*/
@ -90,6 +99,8 @@ global_request_scheduler_minheap_initialize(enum SCHEDULER scheduler)
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_fn);
} else if (scheduler == SCHEDULER_SRSF) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_srsf_fn);
} else if (scheduler == SCHEDULER_MDL) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_mdl_fn);
}
struct global_request_scheduler_config config = {

@ -7,8 +7,10 @@
#include "generic_thread.h"
#include "listener_thread.h"
#include "runtime.h"
#include "scheduler.h"
extern uint64_t system_start_timestamp;
extern enum SCHEDULER scheduler;
/*When reading the json file, the size has been determined at module.c JSON_MAX_ELEMENT_COUNT*/
const int QUEUE_SIZE = 16;
/*
@ -185,7 +187,20 @@ listener_thread_main(void *dummy)
while (rear != front)
{
struct module *current_module = queue[front++];
if (scheduler == SCHEDULER_SRSF || scheduler == SCHEDULER_EDF)
{
estimated_execution_time += admission_info_get_percentile(&current_module->admissions_info);
}else if (scheduler == SCHEDULER_MDL){
/* Set a baseline model, and if the predicted parameters of the model are more than 1.2 times the parameters of the baseline, then select the baseline parameters */
uint64_t estimated_execution_base_percentile = 0, estimated_execution_model = 0;
estimated_execution_base_percentile = admission_info_get_percentile(&current_module->admissions_info);
estimated_execution_model = admission_info_get_average(&current_module->admissions_info);
if (estimated_execution_model > 1.2 * estimated_execution_base_percentile)
{
estimated_execution_model = estimated_execution_base_percentile;
}
estimated_execution_time += estimated_execution_model;
}
for (int i = 0; i < current_module->next_module_count; i++) {
if (current_module->next_module[i] != NULL && !current_module->next_module[i]->runtime_visited)
{

@ -82,6 +82,11 @@ local_runqueue_minheap_initialize(enum SCHEDULER scheduler)
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_priority);
} else if (scheduler == SCHEDULER_SRSF) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_srsf_priority);
} else if (scheduler == SCHEDULER_MDL) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_mdl_priority);
} else {
panic("Invalid scheduler type %d\n", scheduler);
}
/* Register Function Pointers for Abstract Scheduling API */

@ -186,6 +186,8 @@ runtime_configure()
scheduler = SCHEDULER_FIFO;
} else if (strcmp(scheduler_policy, "SRSF") == 0) {
scheduler = SCHEDULER_SRSF;
} else if (strcmp(scheduler_policy, "MDL") == 0) {
scheduler = SCHEDULER_MDL;
} else {
panic("Invalid scheduler policy: %s. Must be {EDF|FIFO}\n", scheduler_policy);
}
@ -197,7 +199,7 @@ runtime_configure()
if (strcmp(sigalrm_policy, "BROADCAST") == 0) {
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
} else if (strcmp(sigalrm_policy, "TRIAGED") == 0) {
if (unlikely(scheduler != SCHEDULER_EDF && scheduler != SCHEDULER_SRSF)) panic("triaged sigalrm handlers are only valid with EDF and SRSF\n");
if (unlikely(scheduler != SCHEDULER_EDF && scheduler != SCHEDULER_SRSF && scheduler != SCHEDULER_MDL)) panic("triaged sigalrm handlers are only valid with EDF and SRSF\n");
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_TRIAGED;
} else {
panic("Invalid sigalrm policy: %s. Must be {BROADCAST|TRIAGED}\n", sigalrm_policy);

@ -94,6 +94,11 @@ sigalrm_propagate_workers(siginfo_t *signal_info)
uint64_t global_slack = global_request_scheduler_peek();
if (global_slack < local_remaining_slack) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
} else if (scheduler == SCHEDULER_MDL) {
uint64_t local_remaining_slack = runtime_worker_threads_remaining_slack[i];
uint64_t global_slack = global_request_scheduler_peek();
if (global_slack < local_remaining_slack) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
}
}
case RUNTIME_SIGALRM_HANDLER_BROADCAST: {
@ -253,12 +258,13 @@ done:
void
software_interrupt_arm_timer(void)
{
if (!runtime_preemption_enabled) return;
/* if preemption disabled, broadcast sig alarm to all other threads to record the queuelength info */
if (!runtime_preemption_enabled) {
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
}
if (!runtime_preemption_enabled) return;
struct itimerval interval_timer;
memset(&interval_timer, 0, sizeof(struct itimerval));

File diff suppressed because one or more lines are too long

@ -43,7 +43,7 @@ sod:
@make dir samples.so -C ./sod/
@cp ./sod/bin/license_plate_detection.so ${SLEDGE_BIN_DIR}/lpd_wasm.so
@cp ./sod/bin/resize_image.so ${SLEDGE_BIN_DIR}/resize_wasm.so
@cp ./sod/bin/reverse.so ${SLEDGE_BIN_DIR}/reverse_wasm.so
# @cp ./sod/bin/reverse.so ${SLEDGE_BIN_DIR}/reverse_wasm.so
C-Image-Manip:
@echo "Making and Installing pngPlay"

@ -21,8 +21,8 @@ echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2400
export SLEDGE_SCHEDULER=SRSF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SCHEDULER=MDL
export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=1
#export SLEDGE_SCHEDULER=EDF
@ -30,8 +30,8 @@ export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/graph.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_dag_image.json

Loading…
Cancel
Save