Compare commits

...

23 Commits

Author SHA1 Message Date
hwwang 7da0f72179 修复关于初始化时间的bug
7 months ago
hwwang ed80c30ccf 修复链式中数据流二次拷贝的问题,以及节点扇出数据二次拷贝的问题
7 months ago
hwwang 8990f5c5ed 添加机器学习框架数据获取代码,主要是获取TCP套接字报文大小
7 months ago
hwwang 87672b74c4 添加LLF调度 修改SRSF调度细节
7 months ago
hwwang 87ba8af8a0 fix time bug
8 months ago
hwwang a696655ad5 添加线性拟合计算剩余时间,并设置model_base,当预估时间超过base时间的1.2倍,改为base时间
8 months ago
hwwang 10d9659172 添加机器学习调度框架,考虑如何训练数据
8 months ago
hwwang 97b158f508 添加测试用例
9 months ago
hwwang a66ea1bc25 修改打印的bug,添加图片测试脚本
10 months ago
hwwang 4b2a905e92 为扇入节点添加处理数据的api,格式是四字节数据长度加数据
10 months ago
hwwang a056d25ff4 修改节点扇入时,不用&做分割符,改为增加前四字节作为数据长度
10 months ago
hwwang 46b4325dba 添加图测试文件
10 months ago
hwwang 8eae02580f 添加图测试文件
10 months ago
hwwang 5711145b3b 修改hash表和sandbox_request的互斥锁为读写锁,但现在觉得map不加锁可能会有隐患
11 months ago
hwwang 1854297530 将沙盒输出挂载在一个链表中,修改锁的范围,但是感觉结果不是很好
11 months ago
hwwang bae018314f 修改数据拷贝方式,关于50%的响应获取问题还有待商榷
11 months ago
hwwang 0f9b75d784 asd
11 months ago
hwwang 8ae955fbfc 修改内存释放错误的bug
11 months ago
hwwang 702f5de60d 为hash表添加锁
11 months ago
hwwang 0b1fb6c5d7 修一部分logbug
11 months ago
hwwang 7abc07445d 修改log参数,未修改完
11 months ago
hwwang 3aeed6f94b 已添加DAG,需要修改各部份时间测试代码
11 months ago
hwwang 70fdbb348c need modify thirdparty hashmap
12 months ago

@ -1,3 +1,3 @@
LD_LIBRARY_PATH=/home/hai/sledge-serverless-framework/runtime/bin
SLEDGE_SCHEDULER=EDF
SLEDGE_SANDBOX_PERF_LOG=/home/hai/sledge-serverless-framework/debuglog.txt
LD_LIBRARY_PATH=/home/hai/sledge/sledge/runtime/bin
SLEDGE_SCHEDULER=MDL
SLEDGE_SANDBOX_PERF_LOG=/home/hai/sledge/sledge/runtime/tests/runtime_sandbox_perf_log.log

5
.gitignore vendored

@ -56,6 +56,11 @@ runtime/tests/tmp/
runtime/tests/**/*.csv
runtime/tests/**/*.txt
runtime/tests/**/*.xlsx
runtime/tests/test_data
runtime/tests/*.log
runtime/data/*.txt
runtime/data/*.log
runtime/data/*.json
# Swap Files
*.swp

7
.gitmodules vendored

@ -11,6 +11,9 @@ url = https://github.com/gwsystems/ck.git
[submodule "jsmn"]
path = runtime/thirdparty/jsmn
url = https://github.com/gwsystems/jsmn.git
[submodule "hash"]
path = runtime/thirdparty/hashmap
url = https://github.com/tidwall/hashmap.c.git
[submodule "runtime/tests/gocr"]
path = runtime/tests/gocr
url = https://github.com/gwsystems/gocr.git
@ -25,8 +28,8 @@ url = https://github.com/gwsystems/CMSIS_5_NN.git
branch = sledge
[submodule "runtime/tests/sod"]
path = runtime/tests/sod
url = https://github.com/gwsystems/sod.git
branch = sledge
url = http://47.120.57.226:3000/haiwan/Sod.git
branch = main
[submodule "runtime/tests/speechtotext"]
path = runtime/tests/speechtotext
url = https://github.com/gwsystems/speechtotext.git

@ -9,7 +9,9 @@
"defines": [
"USE_MEM_VM",
"x86_64",
"_GNU_SOURCE"
"_GNU_SOURCE",
"LOG_TO_FILE",
"DEEP_LEARN_SCHDUE"
],
"cStandard": "${default}",
"compilerPath": "/usr/bin/clang",

162
.vscode/launch.json vendored

@ -1,52 +1,114 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Hyde",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/applications/ocr/hyde/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "Preemption",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/preemption/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
"version": "0.2.0",
"configurations": [
{
"name": "Hyde",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/applications/ocr/hyde/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "Preemption",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/preemption/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "graph",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/tests/graph.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"sourceFileMap": {
"/sledge/runtime": "${workspaceFolder}/runtime"
},
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "utest",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/utest/map",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"sourceFileMap": {},
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "C/C++ Runner: Debug Session",
"type": "cppdbg",
"request": "launch",
"args": [],
"stopAtEntry": false,
"externalConsole": false,
"cwd": "/home/weihao/sledge/sledge_tree/runtime/tests/noop",
"program": "/home/weihao/sledge/sledge_tree/runtime/tests/noop/build/Debug/outDebug",
"MIMode": "gdb",
"miDebuggerPath": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
}

@ -95,7 +95,26 @@
"compare": "c",
"cstdint": "c",
"format": "c",
"jsmn.h": "c"
"jsmn.h": "c",
"__bit_reference": "c",
"algorithm": "c",
"lock_time.h": "c",
"mcs.h": "c",
"map.h": "c",
"stdio.h": "c",
"hashmap.h": "c",
"mutex": "cpp",
"xmalloc.h": "c",
"stddef.h": "c",
"__mutex_base": "c",
"memory": "c",
"atomic": "c",
"condition_variable": "c",
"ostream": "c",
"stop_token": "c",
"dag_data_split.h": "c",
"scheduler.h": "c",
"priority_queue.h": "c"
},
"files.exclude": {
"**/.git": true,
@ -144,6 +163,7 @@
"**/runtime/thirdparty/http-parser/**": true,
"**/runtime/thirdparty/jsmn/**": true,
"**/runtime/thirdparty/dist/**": true,
"**/runtime/thirdparty/hashmap/**": true,
"*.o": true,
"*.bc": true,
"*.wasm": true

@ -0,0 +1 @@
12

@ -41,6 +41,14 @@ BINARY_NAME=sledgert
# Feature Toggles
# CFLAGS += -DADMISSIONS_CONTROL
# This definition is used when the module is triggered by an HTTP request.
# It retrieves the length of the HTTP message data, the hash table, and the global queue length via a TCP socket.
# These values are then used as features for machine learning training
# CFLAGS += -DDEEP_LEARN_SCHDUE
# This definition determines whether to use the median or the mean to calculate the execution times of the first 256 services.
# CFLAGS += -DGET_AVER_TIME
# Debugging Flags
# Strips out calls to assert() and disables debuglog
@ -61,6 +69,9 @@ CFLAGS += -DLOG_TO_FILE
# CFLAGS += -DOPT_AVOID_GLOBAL_QUEUE
# CFLAGS += -DLOG_RUNTIME_FILE_LOG
CFLAGS += -DLOG_RUNTIME_MEM_LOG
# For machine learning purposes, the following data is collected and saved:
# Length of the HTTP message Hash table Global queue length Service execution time
# CFLAGS += -DLOG_DEEP_LEARN_SCHDUE
# This dumps per module *.csv files containing the cycle a sandbox has been in RUNNING when each
# page is allocated. This helps understand the relationship to memory allocation and execution time.
@ -150,6 +161,7 @@ fetch:
@git config --global --add safe.directory /sledge/runtime/tests/speechtotext/thirdparty/sphinxbase
@git config --global --add safe.directory /sledge/runtime/thirdparty/http-parser
@git config --global --add safe.directory /sledge/runtime/thirdparty/jsmn
@git config --global --add safe.directory /sledge/runtime/thirdparty/hashmap
@git config --global --add safe.directory /sledge/runtime/tests/C-Image-Manip
@git submodule update --init --recursive

@ -0,0 +1,30 @@
import sys
def calculate_average(input_file, column_index):
total = 0
count = 0
with open(input_file, 'r') as f:
for line in f:
columns = line.strip().split(',')
if len(columns) > column_index:
try:
value = float(columns[column_index])
total += value
count += 1
except ValueError:
print(f"error value: {columns[column_index]}")
if count > 0:
average = total / count
print(f"list {column_index + 1} average: {average}")
else:
print("no value")
if __name__ == "__main__":
if len(sys.argv) != 3:
print(" python calculate_average.py input_file column_index")
else:
input_file = sys.argv[1]
column_index = int(sys.argv[2]) - 1
calculate_average(input_file, column_index)

@ -0,0 +1,41 @@
# -*- coding: UTF-8 -*-
import matplotlib.pyplot as plt
def plot_max_rps_comparison():
# Data from the previous description
sizes = ['5KB', '40KB', '105KB', '305KB']
max_rps_before = [121, 85, 63, 33]
max_rps_after = [607, 235, 128, 45]
# Index for each bar position along the x-axis
index = range(len(sizes))
bar_width = 0.35 # width of the bars
# Setting up the plot
fig, ax = plt.subplots()
ax.set_facecolor('#f0f0f0')
# Creating bars for "Before Optimization"
rects1 = ax.bar(index, max_rps_before, bar_width, label='Mixed Task', color='orange')
# Creating bars for "After Optimization" shifted to the right by `bar_width`
rects2 = ax.bar([p + bar_width for p in index], max_rps_after, bar_width, label='Single Task', color='skyblue')
# Adding labels and title
ax.set_xlabel('Image Size(KB)')
ax.set_ylabel('MAX RPS')
ax.set_title('Mixed and Single Task Performance Comparison')
# Setting the position of the x-ticks to be in the middle of the grouped bars
ax.set_xticks([p + bar_width / 2 for p in index])
ax.set_xticklabels(sizes)
# Adding a legend to explain which bars represent before and after optimization
ax.legend()
# Displaying the plot
plt.show()
# Call the function to display the plot
plot_max_rps_comparison()

@ -0,0 +1,51 @@
import matplotlib.pyplot as plt
from matplotlib import font_manager as fm
def load_data(filename):
"""
Load data from a file.
Assumes that each line in the file is a float value.
"""
with open(filename, 'r') as file:
data = file.read().split()
data = [float(i) for i in data]
return data
def main():
# 加载数据
edf_data = [0.83, 1.35, 1.88, 2.36, 1.9]
llf_data = [0.45, 0.4, 0.52, 0.97, 0.9]
# 设置X轴的数据点
x_labels = [50, 60, 70, 80, 90] # 确保数据与这些标签相匹配
font_properties = fm.FontProperties(family='Times New Roman', size=18)
plt.rcParams.update({'font.size': 18, 'font.family': 'Times New Roman'})
# 创建图形和绘制数据
plt.figure(figsize=(10, 5))
ax = plt.gca() # 获取当前的Axes对象ax
ax.set_facecolor('#f0f0f0') # 设置浅灰色背景
plt.plot(x_labels, edf_data, marker='s', linestyle='-', color='#C8503D', markersize=8, label='EDF')
plt.plot(x_labels, llf_data, marker='^', linestyle='-', color='#00008B', markersize=8, label='LLF')
# 添加标题、标签和图例
plt.title('5KB-1.5*Deadline', fontsize=20, fontproperties=font_properties)
plt.xlabel('Load (% of maximum RPS)', fontproperties=font_properties)
plt.ylabel('Deadline Miss Rate (%)', fontproperties=font_properties)
plt.legend(prop=font_properties)
# 设置X轴刻度
plt.xticks(range(50, 91, 10))
# 设置网格
plt.grid(True)
# 移除边框的四边刻度线
plt.tick_params(axis='both', which='both', length=0) # 移除刻度线
# 显示图形
plt.show()
if __name__ == "__main__":
main()

@ -0,0 +1,24 @@
# split_logs.py
def split_logs(input_file):
modules = {
"resize1": [],
"png2bmp1": [],
"lpd_wasm1": [],
"cifar10_1": [],
"work1": []
}
with open(input_file, 'r') as f:
for line in f:
for module in modules.keys():
if module in line:
modules[module].append(line.strip())
break
for module, entries in modules.items():
with open(f"{module}.txt", 'w') as outfile:
outfile.write("\n".join(entries) + "\n")
if __name__ == "__main__":
split_logs("sledge.log")

@ -0,0 +1,20 @@
import sys
def split_columns(input_file):
columns = []
with open(input_file, 'r') as f:
for line in f:
parts = line.strip().split(',')
for i, part in enumerate(parts):
if len(columns) <= i:
columns.append([])
columns[i].append(part)
for i, column in enumerate(columns):
with open(f"{input_file[:-4]}_column_{i + 1}.txt", 'w') as outfile:
outfile.write("\n".join(column) + "\n")
if __name__ == "__main__":
for input_file in sys.argv[1:]:
split_columns(input_file)

File diff suppressed because one or more lines are too long

@ -15,4 +15,5 @@ void admissions_info_initialize(struct admissions_info *self, char* module_name,
void admissions_info_update(struct admissions_info *self, uint64_t execution_duration);
uint64_t admission_info_get_percentile(struct admissions_info *self);
uint64_t admission_info_get_average(struct admissions_info *self);

@ -9,12 +9,14 @@ typedef struct sandbox_request *(*global_request_scheduler_add_fn_t)(void *);
typedef int (*global_request_scheduler_remove_fn_t)(struct sandbox_request **);
typedef int (*global_request_scheduler_remove_if_earlier_fn_t)(struct sandbox_request **, uint64_t);
typedef uint64_t (*global_request_scheduler_peek_fn_t)(void);
typedef int (*global_request_scheduler_size_fn_t)(void);
struct global_request_scheduler_config {
global_request_scheduler_add_fn_t add_fn;
global_request_scheduler_remove_fn_t remove_fn;
global_request_scheduler_remove_if_earlier_fn_t remove_if_earlier_fn;
global_request_scheduler_peek_fn_t peek_fn;
global_request_scheduler_size_fn_t size_fn;
};
@ -23,3 +25,4 @@ struct sandbox_request *global_request_scheduler_add(struct sandbox_request *);
int global_request_scheduler_remove(struct sandbox_request **);
int global_request_scheduler_remove_if_earlier(struct sandbox_request **, uint64_t targed_deadline);
uint64_t global_request_scheduler_peek(void);
int global_request_scheduler_size(void);

@ -0,0 +1,180 @@
#pragma once
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include "xmalloc.h"
/* Simple K-V store based on The Practice of Programming by Kernighan and Pike */
/* Bucket count is sized to be a prime that is approximately 20% larger than the desired capacity (6k keys) */
#define MAP_BUCKET_COUNT 7907
#define MAP_HASH jenkins_hash
struct map_node {
struct map_node *next;
char *key;
void *value;
uint32_t key_len;
uint32_t value_len;
uint32_t hash;
bool manage_mvalue;
};
struct map_bucket {
struct map_node *head;
};
struct hashmap {
struct map_bucket buckets[MAP_BUCKET_COUNT];
};
static inline void
map_init(struct hashmap *restrict map)
{
for (int i = 0; i < MAP_BUCKET_COUNT; i++) {
map->buckets[i].head = NULL;
}
}
/* See https://en.wikipedia.org/wiki/Jenkins_hash_function */
static inline uint32_t
jenkins_hash(char *key, uint32_t key_len)
{
uint32_t i = 0;
uint32_t hash = 0;
while (i != key_len) {
hash += key[i++];
hash += hash << 10;
hash ^= hash >> 6;
}
hash += hash << 3;
hash ^= hash >> 11;
hash += hash << 15;
return hash;
}
static inline void *
map_get(struct hashmap *map, char *key, uint32_t key_len, uint32_t *ret_value_len)
{
void *value = NULL;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
value = node->value;
*ret_value_len = node->value_len;
goto DONE;
}
}
if (value == NULL) *ret_value_len = 0;
DONE:
return value;
}
/**
* @manage_mvalue manage _ mvalue determines whether the hash table or the caller manages value memory, and true is managed by the hash.
*/
static inline bool
map_set(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len, bool manage_mvalue)
{
bool did_set = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) goto DONE;
}
struct map_node *new_node = (struct map_node *)xmalloc(sizeof(struct map_node));
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value_len = value_len,
.next = bucket->head };
// Copy Key and Value
memcpy(new_node->key, key, key_len);
if (manage_mvalue) {
new_node->value = xmalloc(value_len);
memcpy(new_node->value, value, value_len);
} else {
new_node->value = value;
}
new_node->manage_mvalue = manage_mvalue;
bucket->head = new_node;
did_set = true;
DONE:
return did_set;
}
/**
* @returns boolean if node was deleted or not
*/
static inline bool
map_delete(struct hashmap *map, char *key, uint32_t key_len)
{
bool did_delete = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
struct map_node *prev = NULL;
struct map_node *node = bucket->head;
while (node != NULL) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
if (prev == NULL) {
bucket->head = node->next;
} else {
prev->next = node->next;
}
free(node->key);
node->key = NULL;
if (node->manage_mvalue) {
free(node->value);
node->value = NULL;
}
free(node);
node = NULL;
did_delete = true;
break;
}
prev = node;
node = node->next;
}
return did_delete;
}
static inline void
map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len)
{
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
node->value_len = value_len;
node->value = realloc(node->value, value_len);
assert(node->value);
if (node->manage_mvalue)
{
memcpy(node->value, value, value_len);
}else node->value = value;
return;
}
}
panic("map_upsert: key not found");
}

@ -0,0 +1,246 @@
#pragma once
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
//#include "lock.h"
#include "xmalloc.h"
typedef pthread_rwlock_t rwlock_t;
#define LOCK_INIT_RW(lock) pthread_rwlock_init(lock, NULL)
#define LOCK_RDLOCK_RW(lock) pthread_rwlock_rdlock(lock)
#define LOCK_WRLOCK_RW(lock) pthread_rwlock_wrlock(lock)
#define LOCK_UNLOCK_RW(lock) pthread_rwlock_unlock(lock)
/* Simple K-V store based on The Practice of Programming by Kernighan and Pike */
/* Bucket count is sized to be a prime that is approximately 20% larger than the desired capacity (6k keys) */
#define MAP_BUCKET_COUNT 7907
#define MAP_HASH jenkins_hash
struct map_node {
struct map_node *next;
char *key;
void *value;
uint32_t key_len;
uint32_t value_len;
uint32_t hash;
bool manage_mvalue;
};
struct map_bucket {
rwlock_t lock;
struct map_node *head;
};
struct hashmap {
struct map_bucket buckets[MAP_BUCKET_COUNT];
};
static inline void
map_init(struct hashmap *restrict map)
{
for (int i = 0; i < MAP_BUCKET_COUNT; i++) {
map->buckets[i].head = NULL;
LOCK_INIT_RW(&map->buckets[i].lock);
}
}
/* See https://en.wikipedia.org/wiki/Jenkins_hash_function */
static inline uint32_t
jenkins_hash(char *key, uint32_t key_len)
{
uint32_t i = 0;
uint32_t hash = 0;
while (i != key_len) {
hash += key[i++];
hash += hash << 10;
hash ^= hash >> 6;
}
hash += hash << 3;
hash ^= hash >> 11;
hash += hash << 15;
return hash;
}
static inline void *
map_get(struct hashmap *map, char *key, uint32_t key_len, uint32_t *ret_value_len)
{
void *value = NULL;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_RDLOCK_RW(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
value = node->value;
*ret_value_len = node->value_len;
goto DONE;
}
}
if (value == NULL) *ret_value_len = 0;
DONE:
LOCK_UNLOCK_RW(&bucket->lock);
return value;
}
/**
* @manage_mvalue manage _ mvalue determines whether the hash table or the caller manages value memory, and true is managed by the hash.
*/
static inline bool
map_set(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len, bool manage_mvalue)
{
bool did_set = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_WRLOCK_RW(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) goto DONE;
}
struct map_node *new_node = (struct map_node *)xmalloc(sizeof(struct map_node));
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value_len = value_len,
.next = bucket->head };
// Copy Key and Value
memcpy(new_node->key, key, key_len);
if (manage_mvalue) {
new_node->value = xmalloc(value_len);
memcpy(new_node->value, value, value_len);
} else {
new_node->value = value;
}
new_node->manage_mvalue = manage_mvalue;
bucket->head = new_node;
did_set = true;
DONE:
LOCK_UNLOCK_RW(&bucket->lock);
return did_set;
}
/**
* @returns boolean if node was deleted or not
*/
static inline bool
map_delete(struct hashmap *map, char *key, uint32_t key_len)
{
bool did_delete = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_WRLOCK_RW(&bucket->lock);
struct map_node *prev = NULL;
struct map_node *node = bucket->head;
while (node != NULL) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
if (prev == NULL) {
bucket->head = node->next;
} else {
prev->next = node->next;
}
free(node->key);
node->key = NULL;
if (node->manage_mvalue) {
free(node->value);
node->value = NULL;
}
free(node);
node = NULL;
did_delete = true;
break;
}
prev = node;
node = node->next;
}
LOCK_UNLOCK_RW(&bucket->lock);
return did_delete;
}
static inline void
map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len)
{
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_WRLOCK_RW(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
node->value_len = value_len;
node->value = realloc(node->value, value_len);
assert(node->value);
if (node->manage_mvalue)
{
memcpy(node->value, value, value_len);
}else node->value = value;
goto DONE;
}
}
panic("map_upsert: key not found");
/*struct map_node *new_node = (struct map_node *)xmalloc(sizeof(struct map_node));
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value = xmalloc(value_len),
.value_len = value_len,
.next = bucket->head };
assert(new_node->key);
assert(new_node->value);
// Copy Key and Value
memcpy(new_node->key, key, key_len);
memcpy(new_node->value, value, value_len);
bucket->head = new_node;*/
DONE:
LOCK_UNLOCK_RW(&bucket->lock);
}
/*static inline void
map_destroy(struct hashmap *map) {
for (int i = 0; i < MAP_BUCKET_COUNT; i++) {
LOCK_LOCK_RW(&map->buckets[i].lock);
struct map_node *current = map->buckets[i].head;
struct map_node *next;
while (current != NULL) {
next = current->next;
free(current->key);
if (current->manage_mvalue && current->value != NULL) {
free(current->value);
}
free(current);
current = next;
}
LOCK_UNLOCK_RW(&map->buckets[i].lock);
pthread_mutex_destroy(&map->buckets[i].lock);
// 确保头指针设置为 NULL
map->buckets[i].head = NULL;
}
// 如果 hashmap 是动态分配的,这里还需要释放 hashmap 结构本身
// free(map);
}*/

@ -39,7 +39,14 @@
#warning \
"MODULE_MAX_PENDING_CLIENT_REQUESTS likely exceeds the value in /proc/sys/net/core/somaxconn and thus may be silently truncated";
#endif
#ifdef DEEP_LEARN_SCHDUE
struct MDL
{
int hash_node;
int global_queue_node;
int http_data_size;
};
#endif
struct module {
char name[MODULE_MAX_NAME_LENGTH];
char path[MODULE_MAX_PATH_LENGTH];
@ -78,7 +85,16 @@ struct module {
/* Entry Function to invoke serverless function */
mod_main_fn_t main;
struct module *next_module; /* the next module in the chain */
struct module **next_module; /* the next module in the DAG */
struct module **pre_module; /* the previous module in the DAG */
char **next_module_names; /* the next modules name in the DAG */
uint32_t next_module_count;
uint32_t pre_module_count;
bool runtime_visited; /* used for calculating the estimated time */
uint32_t run_priority;/* Used for prioritizing data fan-in to a node */
#ifdef DEEP_LEARN_SCHDUE
struct MDL mdl;/*save data for deep learn */
#endif
};
/*************************

@ -24,6 +24,9 @@ perf_window_initialize(struct perf_window *self, char* module_name)
self->count = 0;
memset(&self->by_duration, 0, sizeof(struct execution_node) * PERF_WINDOW_BUFFER_SIZE);
memset(&self->by_termination, 0, sizeof(uint16_t) * PERF_WINDOW_BUFFER_SIZE);
#ifdef GET_AVER_TIME
memset(&self->by_duration_for_mdl, 0, sizeof(uint64_t) * PERF_WINDOW_BUFFER_SIZE);
#endif
}
@ -90,12 +93,18 @@ perf_window_add(struct perf_window *self, uint64_t value)
self->by_termination[i] = i;
self->by_duration[i] = (struct execution_node){ .execution_time = value,
.by_termination_idx = i };
#ifdef GET_AVER_TIME
self->by_duration_for_mdl[i] = value;
#endif
}
self->count = PERF_WINDOW_BUFFER_SIZE;
goto done;
}
/* Otherwise, replace the oldest value, and then sort */
#ifdef GET_AVER_TIME
self->by_duration_for_mdl[self->count % PERF_WINDOW_BUFFER_SIZE] = value;
#endif
uint16_t idx_of_oldest = self->by_termination[self->count % PERF_WINDOW_BUFFER_SIZE];
bool check_up = value > self->by_duration[idx_of_oldest].execution_time;
@ -154,6 +163,33 @@ perf_window_get_percentile(struct perf_window *self, int percentile, int precomp
return self->by_duration[size * percentile / 100].execution_time;
}
#ifdef GET_AVER_TIME
static inline uint64_t
perf_window_get_average(struct perf_window *self)
{
assert(self != NULL);
int size = self->count;
if (size == 0) {
return 0;
} else if (size < PERF_WINDOW_BUFFER_SIZE) {
uint64_t average = 0;
for (size_t i = 0; i < size; i++)
{
average += self->by_duration_for_mdl[i];
}
average = average / size;
return average;
} else{
uint64_t average = 0;
for (size_t i = 0; i < PERF_WINDOW_BUFFER_SIZE; i++) {
average += self->by_duration_for_mdl[i];
}
average /= PERF_WINDOW_BUFFER_SIZE;
return average;
}
}
#endif
/**
* Returns the total count of executions
* @returns total count

@ -26,6 +26,9 @@ struct execution_node {
struct perf_window {
char name[32];
struct execution_node by_duration[PERF_WINDOW_BUFFER_SIZE];
#ifdef GET_AVER_TIME
uint64_t by_duration_for_mdl[PERF_WINDOW_BUFFER_SIZE];
#endif
uint16_t by_termination[PERF_WINDOW_BUFFER_SIZE];
uint64_t count;
lock_t lock;

@ -298,7 +298,9 @@ static inline int
priority_queue_length_nolock(struct priority_queue *self)
{
assert(self != NULL);
#ifndef DEEP_LEARN_SCHDUE
assert(!listener_thread_is_running());
#endif
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
return self->size;

@ -44,7 +44,7 @@ extern uint32_t runtime_worker_threads_count;
extern int runtime_worker_threads_argument[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern uint64_t runtime_worker_threads_deadline[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern uint64_t runtime_worker_threads_remaining_slack[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern uint64_t runtime_worker_threads_laxity[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern void runtime_initialize(void);
extern void runtime_set_pthread_prio(pthread_t thread, unsigned int nice);
extern void runtime_set_resource_limits_to_max(void);

@ -115,6 +115,24 @@ sandbox_get_srsf_priority(void *element)
return remaining_slack;
};
static inline uint64_t
sandbox_get_mdl_priority(void *element)
{
struct sandbox *sandbox = (struct sandbox *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack_mdl = sandbox->remaining_slack - (now - sandbox->last_update_timestamp);
return remaining_slack_mdl;
};
static inline uint64_t
sandbox_get_llf_priority(void *element)
{
struct sandbox *sandbox = (struct sandbox *)element;
uint64_t now = __getcycles();
uint64_t Laxity_llf = sandbox->laxity - (now - sandbox->last_update_timestamp);
return Laxity_llf;
};
/**
* Maps a sandbox fd to an underlying host fd
* Returns error condition if the file_descriptor to set does not contain sandbox preopen magic
@ -242,9 +260,26 @@ sandbox_mem_print_perf(struct sandbox *sandbox)
* becomes more intelligent, then peak linear memory size needs to be tracked
* seperately from current linear memory size.
*/
mem_log("%d,%u,%s():%d,%s,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n", worker_thread_idx, sandbox->id,
mem_log("%d,%lu,%s():%d,%s,%u,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n", worker_thread_idx, sandbox->id,
sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state),
sandbox->module->relative_deadline_us, total_time_us, queued_us, initializing_us, runnable_us,
running_us, blocked_us, returned_us, sandbox->linear_memory_size);
}
static inline void
sandbox_MDL_print_perf(struct sandbox *sandbox)
{
#ifndef LOG_DEEP_LEARN_SCHDUE
return;
#endif
/* If the log was not defined by an environment variable, early out */
if (runtime_sandbox_perf_log == NULL) return;
#ifdef DEEP_LEARN_SCHDUE
if (sandbox->module->next_module == NULL) {
uint64_t total_time = (sandbox->completion_timestamp - sandbox->request_arrival_timestamp) / runtime_processor_speed_MHz;
fprintf(runtime_sandbox_perf_log, "%s,%lu,%d,%d,%d\n", sandbox->module->name, total_time,sandbox->module->mdl.hash_node,sandbox->module->mdl.global_queue_node,sandbox->module->mdl.http_data_size);
}
#endif
}

@ -12,10 +12,17 @@
#include "module.h"
#include "runtime.h"
#include "sandbox_state.h"
#include "lock.h"
struct sandbox_pre_functions_output {
char * previous_function_output;
ssize_t output_length;
uint32_t run_priority;
struct sandbox_pre_functions_output* next;
};
struct sandbox_request {
uint64_t id;
bool request_from_outside; /* true is yes, false is no */
bool request_from_outside; /* true is yes, false is no */
struct module * module;
char * arguments;
int socket_descriptor;
@ -24,10 +31,13 @@ struct sandbox_request {
uint64_t enqueue_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */
uint64_t last_update_timestamp; /* cycles */
uint64_t remaining_slack; /* cycles */
char * previous_function_output;
ssize_t output_length;
ssize_t previous_request_length;
uint64_t remaining_slack; /* cycles */
uint64_t laxity; /* cycles */
struct sandbox_pre_functions_output *pre_functions_output;
pthread_spinlock_t lock;
char * previous_function_output;
ssize_t output_length;
ssize_t previous_request_length;
/*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles)
@ -73,7 +83,7 @@ sandbox_request_log_allocation(struct sandbox_request *sandbox_request)
static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, bool request_from_outside, ssize_t request_length,
char *arguments, int socket_descriptor, const struct sockaddr *socket_address,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp, uint64_t remaining_slack,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp, uint64_t remaining_slack, uint64_t laxity,
uint64_t admissions_estimate, char *previous_function_output, ssize_t output_length)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
@ -95,7 +105,11 @@ sandbox_request_allocate(struct module *module, bool request_from_outside, ssize
sandbox_request->previous_request_length = request_length;
sandbox_request->last_update_timestamp = enqueue_timestamp;
sandbox_request->remaining_slack = remaining_slack;
sandbox_request->laxity = laxity;
/*Avoid pointer suspension*/
sandbox_request->pre_functions_output = NULL;
pthread_spin_init(&sandbox_request->lock, PTHREAD_PROCESS_PRIVATE);
/*
* Admissions Control State
* Assumption: an estimate of 0 should have been interpreted as a rejection
@ -107,3 +121,131 @@ sandbox_request_allocate(struct module *module, bool request_from_outside, ssize
return sandbox_request;
}
/**
* Allocate a new node for the list of previous function outputs. Ensures that
* the list remains sorted based on run_priority, which must be less than 6.
* @param head the head of struct sandbox_request->previous_function_output
* @param output the output of the previous function
* @param output_length the length of the output
* @param run_priority the run_priority of the sandbox->module->run_priority
* The first four bytes of the data represent the length of the data, and the tail character & serves as the delimiter marker
**/
static inline void
pre_functions_output_request_add(struct sandbox_request *request, char *output, ssize_t output_length, uint32_t run_priority)
{
assert(run_priority < 6);
// pthread_spin_lock(&request->lock);
if (!output || output_length <= 0) {
debuglog("output is null or output_length is <= 0");
// goto done;
return;
}
struct sandbox_pre_functions_output **head = &request->pre_functions_output;
struct sandbox_pre_functions_output *new_node = (struct sandbox_pre_functions_output *)malloc(sizeof(struct sandbox_pre_functions_output));
if (!new_node) {
panic("Could not allocate memory for new node");
}
new_node->previous_function_output = (char *)malloc(output_length);
if (!new_node->previous_function_output) {
free(new_node);
panic("Could not allocate memory for output buffer");
}
//memcpy(new_node->previous_function_output, output, output_length);
new_node->previous_function_output = output;
new_node->output_length = output_length;
new_node->run_priority = run_priority;
new_node->next = NULL;
if (*head == NULL || (*head)->run_priority >= run_priority) {
new_node->next = *head;
*head = new_node;
} else {
struct sandbox_pre_functions_output *current = *head;
while (current->next && current->next->run_priority < run_priority) {
current = current->next;
}
new_node->next = current->next;
current->next = new_node;
}
//done:
// pthread_spin_unlock(&request->lock);
}
/**
* The output result of the sandbox in the splicing structure
**/
static inline void
concatenate_outputs(struct sandbox_request *request) {
if(request->pre_functions_output == NULL) {
return;
}
size_t total_length = 0; // Calculate total length without extra for null character
struct sandbox_pre_functions_output *current = request->pre_functions_output;
if(current && !current->next)
{
char *previous_function_output = (char *)malloc(current->output_length);
if (!previous_function_output) {
panic("Could not allocate memory for concatenated output");
return;
}
memcpy(previous_function_output, current->previous_function_output, current->output_length);
request->output_length = current->output_length;
request->previous_function_output = previous_function_output;
return;
}
/* 4 bytes of data represents the length of the data */
while (current != NULL) {
total_length += current->output_length + 4;
current = current->next;
}
char *concatenated_output = (char *)malloc(total_length);
if (!concatenated_output) {
panic("Could not allocate memory for concatenated output");
return;
}
char *copy_dest = concatenated_output;
current = request->pre_functions_output;
while (current != NULL) {
size_t copy_length = current->output_length;
*(uint32_t *)copy_dest = (uint32_t)copy_length;
copy_dest += 4;
memcpy(copy_dest, current->previous_function_output, copy_length);
copy_dest += copy_length;
current = current->next;
}
if (request->previous_function_output != NULL) {
free(request->previous_function_output);
request->previous_function_output = NULL;
}
request->output_length = total_length;
request->previous_function_output = concatenated_output;
/**/
if (request->pre_functions_output != NULL)
{
struct sandbox_pre_functions_output *current = request->pre_functions_output;
struct sandbox_pre_functions_output *next = NULL;
while (current) {
next = current->next;
free(current->previous_function_output);
free(current);
current = next; }
request->pre_functions_output = NULL;
}
pthread_spin_destroy(&request->lock);
}

@ -59,6 +59,9 @@ sandbox_set_as_complete(struct sandbox *sandbox, sandbox_state_t last_state)
/* Terminal State Logging */
sandbox_print_perf(sandbox);
sandbox_mem_print_perf(sandbox);
#ifdef DEEP_LEARN_SCHDUE
sandbox_MDL_print_perf(sandbox);
#endif
sandbox_summarize_page_allocations(sandbox);
/* Do not touch sandbox state after adding to completion queue to avoid use-after-free bugs */

@ -54,6 +54,9 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox->state = SANDBOX_ERROR;
sandbox_print_perf(sandbox);
sandbox_mem_print_perf(sandbox);
#ifdef DEEP_LEARN_SCHDUE
sandbox_MDL_print_perf(sandbox);
#endif
sandbox_summarize_page_allocations(sandbox);
sandbox_free_linear_memory(sandbox);
admissions_control_subtract(sandbox->admissions_estimate);

@ -51,7 +51,8 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand
/* Copy the socket descriptor, address, and arguments of the client invocation */
sandbox->absolute_deadline = sandbox_request->absolute_deadline;
sandbox->remaining_slack = sandbox_request->remaining_slack;
sandbox->remaining_slack = sandbox_request->remaining_slack;
sandbox->laxity = sandbox_request->laxity;
sandbox->last_update_timestamp = sandbox_request->last_update_timestamp;
sandbox->arguments = (void *)sandbox_request->arguments;
sandbox->client_socket_descriptor = sandbox_request->socket_descriptor;

@ -25,11 +25,13 @@ sandbox_set_as_running(struct sandbox *sandbox, sandbox_state_t last_state)
//uint64_t last = sandbox->last_update_timestamp;
//uint64_t last_rs = sandbox->remaining_slack;
sandbox->remaining_slack -= (now - sandbox->last_update_timestamp);
sandbox->laxity -= (now - sandbox->last_update_timestamp);
sandbox->last_update_timestamp = now;
sandbox->runnable_duration += duration_of_last_state;
current_sandbox_set(sandbox);
runtime_worker_threads_deadline[worker_thread_idx] = sandbox->absolute_deadline;
runtime_worker_threads_remaining_slack[worker_thread_idx] = sandbox->remaining_slack;
runtime_worker_threads_laxity[worker_thread_idx] = sandbox->laxity;
//mem_log("time %lu sandbox starts running, request id:%d name %s obj=%p remaining slack %lu, last_rs %lu now %lu last %lu \n", start_execution,
// sandbox->id, sandbox->module->name, sandbox, sandbox->remaining_slack, last_rs, now, last);
/* Does not handle context switch because the caller knows if we need to use fast or slow switched */
@ -49,3 +51,5 @@ sandbox_set_as_running(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox->last_state_change_timestamp = now;
sandbox->state = SANDBOX_RUNNING;
}

@ -54,8 +54,9 @@ struct sandbox {
uint64_t response_timestamp; /* Timestamp when response is sent */
uint64_t completion_timestamp; /* Timestamp when sandbox runs to completion */
uint64_t last_state_change_timestamp; /* Used for bookkeeping of actual execution time */
uint64_t last_update_timestamp; /* Used for bookkeeping timestamp for SRSF */
uint64_t last_update_timestamp; /* Used for bookkeeping timestamp for SRSF && LLF */
uint64_t remaining_slack; /* Cycles */
uint64_t laxity; /* Cycles */
#ifdef LOG_SANDBOX_MEMORY_PROFILE
uint32_t page_allocation_timestamps[SANDBOX_PAGE_ALLOCATION_TIMESTAMP_COUNT];
size_t page_allocation_timestamps_size;

@ -28,7 +28,9 @@ enum SCHEDULER
{
SCHEDULER_FIFO = 0,
SCHEDULER_EDF = 1,
SCHEDULER_SRSF = 2
SCHEDULER_SRSF = 2,
SCHEDULER_MDL = 3,
SCHEDULER_LLF = 4,
};
extern enum SCHEDULER scheduler;
@ -69,6 +71,7 @@ err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
@ -108,8 +111,89 @@ err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_MDL_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox * local = local_runqueue_get_next();
uint64_t local_remaining_MDL = local == NULL ? UINT64_MAX : local->remaining_slack;
struct sandbox_request *request = NULL;
uint64_t global_remaining_slack = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_remaining_slack < local_remaining_MDL && (local_workload_count <=2 || local_runqueue_count == 0)) {
//if (global_remaining_slack < local_remaining_slack) {
if (global_request_scheduler_remove_if_earlier(&request, local_remaining_MDL) == 0) {
//uint64_t pop_time = __getcycles() - system_start_timestamp;
//mem_log("time %lu remove from GQ, request id:%d name %s remaining slack %lu\n", pop_time,
// request->id, request->module->name, request->remaining_slack);
assert(request != NULL);
struct sandbox *global = sandbox_allocate(request);
if (!global) goto err_allocate;
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
done:
return local_runqueue_get_next();
err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_LLF_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox * local = local_runqueue_get_next();
uint64_t local_Laxity = local == NULL ? UINT64_MAX : local->remaining_slack;
struct sandbox_request *request = NULL;
uint64_t global_local_Laxity = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_local_Laxity < local_Laxity && (local_workload_count <=2 || local_runqueue_count == 0)) {
if (global_request_scheduler_remove_if_earlier(&request, local_Laxity) == 0) {
//uint64_t pop_time = __getcycles() - system_start_timestamp;
//mem_log("time %lu remove from GQ, request id:%d name %s remaining slack %lu\n", pop_time,
// request->id, request->module->name, request->remaining_slack);
assert(request != NULL);
struct sandbox *global = sandbox_allocate(request);
if (!global) goto err_allocate;
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
done:
return local_runqueue_get_next();
err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_fifo_get_next()
{
@ -138,6 +222,7 @@ err_allocate:
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor, &sandbox->client_address);
free(sandbox_request);
sandbox_request = NULL;
err:
sandbox = NULL;
goto done;
@ -153,6 +238,10 @@ scheduler_get_next()
return scheduler_srsf_get_next();
case SCHEDULER_FIFO:
return scheduler_fifo_get_next();
case SCHEDULER_MDL:
return scheduler_MDL_get_next();
case SCHEDULER_LLF:
return scheduler_LLF_get_next();
default:
panic("Unimplemented\n");
}
@ -168,6 +257,12 @@ scheduler_initialize()
case SCHEDULER_SRSF:
global_request_scheduler_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_MDL:
global_request_scheduler_minheap_initialize(SCHEDULER_MDL);
break;
case SCHEDULER_LLF:
global_request_scheduler_minheap_initialize(SCHEDULER_LLF);
break;
case SCHEDULER_FIFO:
global_request_scheduler_deque_initialize();
break;
@ -186,6 +281,12 @@ scheduler_runqueue_initialize()
case SCHEDULER_SRSF:
local_runqueue_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_MDL:
local_runqueue_minheap_initialize(SCHEDULER_MDL);
break;
case SCHEDULER_LLF:
local_runqueue_minheap_initialize(SCHEDULER_LLF);
break;
case SCHEDULER_FIFO:
local_runqueue_list_initialize();
break;
@ -211,8 +312,8 @@ scheduler_preempt(ucontext_t *user_context)
struct sandbox *current = current_sandbox_get();
assert(current != NULL);
assert(current->state == SANDBOX_RUNNING);
if (current-> remaining_slack <= 5000 * runtime_processor_speed_MHz) {
uint64_t RR_least_time = 5000 * runtime_processor_speed_MHz;
if (current-> remaining_slack <= RR_least_time || current->laxity <= RR_least_time) {
return;
}
/* This is for better state-change bookkeeping */
@ -290,6 +391,10 @@ scheduler_print(enum SCHEDULER variant)
return "EDF";
case SCHEDULER_SRSF:
return "SRSF";
case SCHEDULER_MDL:
return "MDL";
case SCHEDULER_LLF:
return "LLF";
}
}

@ -0,0 +1,14 @@
#pragma once
#include <stdlib.h>
#include "likely.h"
#include "panic.h"
static inline void *
xmalloc(size_t size)
{
void *allocation = malloc(size);
if (unlikely(allocation == NULL)) panic("xmalloc failed!\n");
return allocation;
}

@ -42,6 +42,21 @@ admission_info_get_percentile(struct admissions_info *self)
uint64_t estimated_execution = perf_window_get_percentile(&self->perf_window, self->percentile, self->control_index);
return estimated_execution;
}
#ifdef GET_AVER_TIME
/*
* Get the average execution time of this module, no lock for accessing the queue
* @param self
* @returns the specified execution time of this module
*/
uint64_t
admission_info_get_average(struct admissions_info *self)
{
uint64_t estimated_execution = perf_window_get_average(&self->perf_window);
return estimated_execution;
}
#endif
/*
* Adds an execution value to the perf window and calculates and caches and updated estimate
* @param self

@ -8,9 +8,19 @@
#include "scheduler.h"
#include "module.h"
#include "software_interrupt.h"
#include "map.h"
extern uint64_t system_start_timestamp;
lock_t lock;
#ifdef DEEP_LEARN_SCHDUE
_Atomic uint32_t hash_count = 0;
#endif
#define OUTPUT_BUFER_SIZE 1024*5
__thread struct sandbox *worker_thread_current_sandbox = NULL;
__thread struct sandbox_context_cache local_sandbox_context_cache = {
@ -55,6 +65,13 @@ current_sandbox_disable_preemption(struct sandbox *sandbox)
}
}
static inline void
current_sandbox_get_newlaxity(struct sandbox *sandbox, uint64_t now)
{
assert(sandbox);
sandbox->remaining_slack -= (now - sandbox->last_update_timestamp);
}
/**
* Sandbox execution logic
* Handles setup, request parsing, WebAssembly initialization, function execution, response building and
@ -70,7 +87,30 @@ current_sandbox_start(void)
char *error_message = "";
sandbox_initialize_stdio(sandbox);
struct module * next_module = sandbox->module->next_module;
int next_module_idx = sandbox->module->next_module_count;
static struct hashmap *sandbox_req_map = NULL;
static struct hashmap *sandbox_request_id = NULL;
if (sandbox_req_map == NULL || sandbox_request_id == NULL) {
if(sandbox_req_map == NULL)
{
sandbox_req_map = malloc(sizeof(struct hashmap));
#ifdef DEEP_LEARN_SCHDUE
atomic_init(&hash_count, 0);
#endif
map_init(sandbox_req_map);
}
if(sandbox_request_id == NULL)
{
sandbox_request_id = malloc(sizeof(struct hashmap));
map_init(sandbox_request_id);
}
assert(sandbox_req_map != NULL);
assert(sandbox_request_id != NULL);
}
struct module **next_module = sandbox->module->next_module;
/*
* Add the client fd to epoll if it is the first or last sandbox in the chain because they
@ -118,54 +158,216 @@ current_sandbox_start(void)
*/
goto err;
} else if (next_module != NULL) {
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
assert(next_module_idx);
assert(next_module);
size_t next_module_pre_count = next_module[0]->pre_module_count;
assert(next_module_pre_count);
if (next_module_idx == 1 && next_module_pre_count == 1)
{
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t enqueue_timestamp = __getcycles();
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module, false, sandbox->request_length,
next_module->name, sandbox->client_socket_descriptor,
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
struct module *next_module_node = next_module[0];
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, enqueue_timestamp,
sandbox->remaining_slack, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
#ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) {
free(sandbox_request);
goto err;
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack, sandbox->laxity, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
global_request_scheduler_add(sandbox_request);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
} else if (next_module_idx > 1 && next_module_pre_count == 1)
{
assert(next_module_idx > 1);
for (size_t i = 0; i < next_module_idx; i++)
{
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct module * next_module_node = next_module[i];
assert(next_module_node);
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
#ifdef LOG_DEEP_LEARN_SCHDUE
#ifdef DEEP_LEARN_SCHDUE
/*
* If model parameters need to be trained, then enable the feature;
* otherwise, disable it to avoid unnecessary overhead.
*/
next_module_node->mdl.hash_node = sandbox->module->mdl.hash_node;
next_module_node->mdl.global_queue_node = sandbox->module->mdl.global_queue_node;
next_module_node->mdl.http_data_size = sandbox->module->mdl.http_data_size;
#endif
#endif
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack, sandbox->laxity, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
#ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) {
free(sandbox_request);
goto err;
}
assert(next_sandbox->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(next_sandbox, SANDBOX_INITIALIZED);
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
#else
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
#endif
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
goto done;
} else if (next_module_idx == 1 && next_module_pre_count > 1)
{
static bool lock_flag = true;
if (lock_flag)
{
LOCK_INIT(&lock);
lock_flag = false;
}
/*Before each id is put into the hash table, the key needs to add a "module handle"*/
struct module * next_module_node = next_module[0];
assert(next_module_node);
char *cur_request_id = NULL;
int key_len = snprintf(NULL, 0, "%s%lu", next_module_node->name, sandbox->id) + 1;
cur_request_id = (char *)malloc(key_len);
assert(cur_request_id);
snprintf(cur_request_id, key_len, "%s%lu", next_module_node->name, sandbox->id);
uint32_t ret_value_len;
uint32_t rest_pre_count = 888;
/*calculation the pre_function_out*/
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//debuglog("the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
LOCK_LOCK(&lock);
uint64_t *request_id = (uint64_t *)map_get(sandbox_request_id, cur_request_id, strlen(cur_request_id), &ret_value_len);
bool mapflag = false;
if (!request_id) {
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
#ifdef LOG_DEEP_LEARN_SCHDUE
#ifdef DEEP_LEARN_SCHDUE
/*
* If model parameters need to be trained, then enable the feature;
* otherwise, disable it to avoid unnecessary overhead.
*/
next_module_node->mdl.hash_node = sandbox->module->mdl.hash_node;
next_module_node->mdl.global_queue_node = sandbox->module->mdl.global_queue_node;
next_module_node->mdl.http_data_size = sandbox->module->mdl.http_data_size;
#endif
#endif
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack,sandbox->laxity, true, NULL, 0);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
uint32_t module_pre_count = next_module_pre_count;
module_pre_count--;
assert(module_pre_count);
if(!map_set(sandbox_request_id, cur_request_id, strlen(cur_request_id), &module_pre_count, sizeof(uint32_t), true)) panic("the map of sandbox_request_id is NULL\n");
if(!map_set(sandbox_req_map, cur_request_id, strlen(cur_request_id), sandbox_request, sizeof(struct sandbox_request *), false)) panic("the map of sandbox_request is NULL\n");
mapflag = true;
#ifdef DEEP_LEARN_SCHDUE
atomic_fetch_add(&hash_count, 1);
#endif
}
LOCK_UNLOCK(&lock);
struct sandbox_request *sandbox_request = map_get(sandbox_req_map, cur_request_id, strlen(cur_request_id), &ret_value_len);
if(!sandbox_request) panic("the map of sandbox_request is NULL\n");
if (mapflag)
{
pre_functions_output_request_add(sandbox_request, pre_func_output, output_length, sandbox->module->run_priority);
}else{
pthread_spin_lock(&sandbox_request->lock);
pre_functions_output_request_add(sandbox_request, pre_func_output, output_length, sandbox->module->run_priority);
if (!request_id) {
panic("Request ID not found or invalid\n");
}else {
rest_pre_count = *request_id;
}
if(rest_pre_count == 888) panic("the rest_pre_count is not get requst_id\n");
rest_pre_count--;
if (rest_pre_count != 0)
{
map_upsert(sandbox_request_id, cur_request_id, strlen(cur_request_id), &rest_pre_count, sizeof(uint32_t));
}else{
concatenate_outputs(sandbox_request);
uint64_t enqueue_timestamp = __getcycles();
sandbox_request->enqueue_timestamp = enqueue_timestamp;
global_request_scheduler_add(sandbox_request);
map_delete(sandbox_req_map, cur_request_id, strlen(cur_request_id));
map_delete(sandbox_request_id, cur_request_id, strlen(cur_request_id));
#ifdef DEEP_LEARN_SCHDUE
atomic_fetch_sub(&hash_count, 1);
#endif
}
pthread_spin_unlock(&sandbox_request->lock);
}
free(cur_request_id);
cur_request_id = NULL;
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
goto done;
}else
{
error_message = "the strcuture of DAG is not supported\n";
goto err;
}
} else {
/* Retrieve the result, construct the HTTP response, and send to client */
if (sandbox_send_response(sandbox) < 0) {
@ -173,7 +375,7 @@ current_sandbox_start(void)
goto err;
};
http_total_increment_2xx();
//http_total_increment_2xx();
sandbox->response_timestamp = __getcycles();

@ -91,3 +91,8 @@ global_request_scheduler_peek()
{
return global_request_scheduler.peek_fn();
}
int
global_request_scheduler_size(){
return global_request_scheduler.size_fn();
}

@ -64,6 +64,12 @@ global_request_scheduler_minheap_peek(void)
return priority_queue_peek(global_request_scheduler_minheap);
}
static int
global_request_scheduler_minheap_size(void)
{
return priority_queue_length(global_request_scheduler_minheap);
}
uint64_t
sandbox_request_get_priority_fn(void *element)
{
@ -80,6 +86,25 @@ sandbox_request_get_priority_srsf_fn(void *element)
return remaining_slack;
};
uint64_t
sandbox_request_get_priority_mdl_fn(void *element)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack_mdl = sandbox_request->remaining_slack - (now - sandbox_request->last_update_timestamp);
return remaining_slack_mdl;
};
uint64_t
sandbox_request_get_priority_llf_fn(void *element)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)element;
uint64_t now = __getcycles();
uint64_t Laxity_llf = sandbox_request->laxity - (now - sandbox_request->last_update_timestamp);
return Laxity_llf;
};
/**
* Initializes the variant and registers against the polymorphic interface
*/
@ -90,13 +115,18 @@ global_request_scheduler_minheap_initialize(enum SCHEDULER scheduler)
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_fn);
} else if (scheduler == SCHEDULER_SRSF) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_srsf_fn);
} else if (scheduler == SCHEDULER_MDL) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_mdl_fn);
} else if (scheduler == SCHEDULER_LLF) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_llf_fn);
}
struct global_request_scheduler_config config = {
.add_fn = global_request_scheduler_minheap_add,
.remove_fn = global_request_scheduler_minheap_remove,
.remove_if_earlier_fn = global_request_scheduler_minheap_remove_if_earlier,
.peek_fn = global_request_scheduler_minheap_peek
.peek_fn = global_request_scheduler_minheap_peek,
.size_fn = global_request_scheduler_minheap_size
};
global_request_scheduler_initialize(&config);

@ -7,8 +7,15 @@
#include "generic_thread.h"
#include "listener_thread.h"
#include "runtime.h"
#include "scheduler.h"
#ifdef DEEP_LEARN_SCHDUE
extern _Atomic uint32_t hash_count;
#endif
extern uint64_t system_start_timestamp;
extern enum SCHEDULER scheduler;
/*When reading the json file, the size has been determined at module.c JSON_MAX_ELEMENT_COUNT*/
const int QUEUE_SIZE = 36;
/*
* Descriptor of the epoll instance used to monitor the socket descriptors of registered
* serverless modules. The listener cores listens for incoming client requests through this.
@ -100,7 +107,6 @@ listener_thread_main(void *dummy)
*/
assert(descriptor_count > 0);
uint64_t request_arrival_timestamp = __getcycles();
for (int i = 0; i < descriptor_count; i++) {
/* Check Event to determine if epoll returned an error */
if ((epoll_events[i].events & EPOLLERR) == EPOLLERR) {
@ -175,22 +181,76 @@ listener_thread_main(void *dummy)
continue;
}
#ifdef DEEP_LEARN_SCHDUE
/*Get HTTP buffer size*/
int content_length = 0;
module->mdl.global_queue_node = global_request_scheduler_size();
module->mdl.hash_node = hash_count;
char buffer[1024];
ssize_t nbytes_peeked = recv(client_socket, buffer, sizeof(buffer), MSG_PEEK);
if (nbytes_peeked <= 0){
content_length = 5090;
} else{
buffer[sizeof(buffer)-1] = '\0';
const char *content_length_str = "Content-Length: ";
char *start = strstr(buffer, content_length_str);
if (start) {
start += strlen(content_length_str);
while (*start == ' ') start++;
content_length = atoi(start);
//debuglog("Content-Length: %d\n", content_length);
}else{
content_length = 5090;
}
}
module->mdl.http_data_size = content_length;
#endif
/* get total estimated execution time */
uint64_t estimated_execution_time = admission_info_get_percentile(&module->admissions_info);
struct module * next_module = module;
while(next_module) {
estimated_execution_time += admission_info_get_percentile(&next_module->admissions_info);
next_module = next_module->next_module;
}
uint64_t estimated_execution_time = 0;
int front = 0, rear = 0;
struct module *queue[QUEUE_SIZE] = {NULL};
queue[rear++] = module;
while (rear != front)
{
struct module *current_module = queue[front++];
if (scheduler == SCHEDULER_SRSF || scheduler == SCHEDULER_EDF || scheduler == SCHEDULER_LLF)
{
estimated_execution_time += admission_info_get_percentile(&current_module->admissions_info);
}else if (scheduler == SCHEDULER_MDL ){
#ifdef GET_AVER_TIME
estimated_execution_time += admission_info_get_average(&current_module->admissions_info);
#else
estimated_execution_time += admission_info_get_percentile(&current_module->admissions_info);
#endif
}
for (int i = 0; i < current_module->next_module_count; i++) {
if (current_module->next_module[i] != NULL && !current_module->next_module[i]->runtime_visited)
{
queue[rear++] = current_module->next_module[i];
current_module->next_module[i]->runtime_visited = true;
}
}
assert(rear <= QUEUE_SIZE);
assert(front <= QUEUE_SIZE);
}
/*Recover the flags of the module here, so that it can be accessed next time.*/
for (int i = 0; i < QUEUE_SIZE; i++) {
if (queue[i] != NULL) {
struct module *current_module = queue[i];
current_module->runtime_visited = false;
}
}
/* Adding system start timestamp to avoid negative remaining slack in the following update. They are all cycles */
uint64_t remaining_slack = system_start_timestamp + module->relative_deadline - estimated_execution_time;
uint64_t remaining_slack = system_start_timestamp + module->relative_deadline - estimated_execution_time;
uint64_t request_arrival_timestamp = __getcycles();
/* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, true, 0, module->name, client_socket,
sandbox_request_allocate(module, true, 0, module->name, client_socket,
(const struct sockaddr *)&client_address,
request_arrival_timestamp, request_arrival_timestamp,remaining_slack,
request_arrival_timestamp, request_arrival_timestamp,remaining_slack, remaining_slack,
work_admitted, NULL, 0);
/* Add to the Global Sandbox Request Scheduler */

@ -82,6 +82,13 @@ local_runqueue_minheap_initialize(enum SCHEDULER scheduler)
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_priority);
} else if (scheduler == SCHEDULER_SRSF) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_srsf_priority);
} else if (scheduler == SCHEDULER_MDL) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_mdl_priority);
} else if (scheduler == SCHEDULER_LLF) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_llf_priority);
} else{
panic("Invalid scheduler type %d\n", scheduler);
}
/* Register Function Pointers for Abstract Scheduling API */

@ -186,6 +186,11 @@ runtime_configure()
scheduler = SCHEDULER_FIFO;
} else if (strcmp(scheduler_policy, "SRSF") == 0) {
scheduler = SCHEDULER_SRSF;
} else if (strcmp(scheduler_policy, "MDL") == 0) {
scheduler = SCHEDULER_MDL;
} else if (strcmp(scheduler_policy, "LLF") == 0)
{
scheduler = SCHEDULER_LLF;
} else {
panic("Invalid scheduler policy: %s. Must be {EDF|FIFO}\n", scheduler_policy);
}
@ -197,7 +202,7 @@ runtime_configure()
if (strcmp(sigalrm_policy, "BROADCAST") == 0) {
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
} else if (strcmp(sigalrm_policy, "TRIAGED") == 0) {
if (unlikely(scheduler != SCHEDULER_EDF && scheduler != SCHEDULER_SRSF)) panic("triaged sigalrm handlers are only valid with EDF and SRSF\n");
if (unlikely(scheduler != SCHEDULER_EDF && scheduler != SCHEDULER_SRSF && scheduler != SCHEDULER_MDL && scheduler != SCHEDULER_LLF)) panic("triaged sigalrm handlers are only valid with EDF and SRSF\n");
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_TRIAGED;
} else {
panic("Invalid sigalrm policy: %s. Must be {BROADCAST|TRIAGED}\n", sigalrm_policy);
@ -226,8 +231,13 @@ runtime_configure()
printf("\tSandbox Performance Log: %s\n", runtime_sandbox_perf_log_path);
runtime_sandbox_perf_log = fopen(runtime_sandbox_perf_log_path, "w");
if (runtime_sandbox_perf_log == NULL) { perror("sandbox perf log"); }
fprintf(runtime_sandbox_perf_log, "threadid,id,function,state,deadline,actual,queued,initializing,runnable,"
#ifdef LOG_DEEP_LEARN_SCHDUE
fprintf(runtime_sandbox_perf_log, "module.name, total_time,module.hash_node,modul.global_queue_node,module.http_data_size\n");
#else
fprintf(runtime_sandbox_perf_log, "threadid,id,function,state,deadline,actual,queued,initializing,runnable,"
"running,blocked,returned,memory\n");
#endif
} else {
printf("\tSandbox Performance Log: Disabled\n");
}

@ -93,6 +93,7 @@ mem_log(char const * fmt, ...)
} else {
/* Write Success */
log_obj.offset += n;
// dump_log_to_file(log_obj);
}
}

@ -19,6 +19,7 @@
const int JSON_MAX_ELEMENT_COUNT = 16;
const int JSON_MAX_ELEMENT_SIZE = 1024;
const int MODULE_MAX_COUNT = 36;
/*************************
* Private Static Inline *
@ -374,7 +375,10 @@ module_new_from_json(char *file_name)
int module_count = 0;
char *request_headers = NULL;
char *reponse_headers = NULL;
struct module *tail_module = NULL;
struct module **nodes = malloc( MODULE_MAX_COUNT * sizeof(struct module*));
if (nodes == NULL) {
panic("Memory allocation failed for nodes array\n");
}
for (int i = 0; i < total_tokens; i++) {
assert(tokens[i].type == JSMN_OBJECT);
@ -397,21 +401,25 @@ module_new_from_json(char *file_name)
}
memset(reponse_headers, 0, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT);
uint32_t next_module_count = 0;
uint32_t pre_module_count = 0;
int32_t request_size = 0;
int32_t response_size = 0;
int32_t argument_count = 0;
uint32_t priority = 1;
uint32_t port = 0;
uint32_t relative_deadline_us = 0;
uint32_t expected_execution_us = 0;
int admissions_percentile = 50;
bool is_active = false;
bool is_tail_module = false;
// bool is_tail_module = false;
int32_t request_count = 0;
int32_t response_count = 0;
int j = 1;
int ntoks = 2 * tokens[i].size;
char request_content_type[HTTP_MAX_HEADER_VALUE_LENGTH] = { 0 };
char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH] = { 0 };
char **next_module_names = NULL;
for (; j < ntoks;) {
int ntks = 1;
@ -440,11 +448,38 @@ module_new_from_json(char *file_name)
if (buffer < 0 || buffer > 65535)
panic("Expected port between 0 and 65535, saw %d\n", buffer);
port = buffer;
}else if (strcmp(key, "priority") == 0)
{
priority = atoi(val);
if (priority < 0 || priority > 5)
panic("Expected priority between 0 and 5, saw %d\n", priority);
} else if (strcmp(key, "argsize") == 0) {
// Validate in expected range 0..127. Unclear if 127 is an actual hard limit
argument_count = atoi(val);
if (argument_count < 0 || argument_count > 127)
panic("Expected argument count between 0 and 127, saw %d\n", argument_count);
} else if (strcmp(key, "pre_module_count") == 0)
{
pre_module_count = atoi(val);
if (pre_module_count < 0)
panic("Expected pre_module_count to be nonnegative, saw %d\n", pre_module_count);
} else if (strcmp(key, "next_modules") == 0)
{
assert(tokens[i + j + 1].type == JSMN_ARRAY);
request_count = tokens[i + j + 1].size;
ntks += request_count;
ntoks += request_count;
next_module_names = malloc(request_count * sizeof(char *));
next_module_count = request_count;
int array_index = 0;
for (int k = 1; k <= request_count; k++) {
jsmntok_t *g = &tokens[i + j + k + 1];
int name_length = g->end - g->start;
next_module_names[array_index] = malloc(name_length + 1);
strncpy(next_module_names[array_index], file_buffer + g->start, name_length);
next_module_names[array_index][name_length] = '\0';
array_index++;
}
} else if (strcmp(key, "active") == 0) {
assert(tokens[i + j + 1].type == JSMN_PRIMITIVE);
if (val[0] == 't') {
@ -454,15 +489,15 @@ module_new_from_json(char *file_name)
} else {
panic("Expected active key to be a JSON boolean, was %s\n", val);
}
} else if (strcmp(key, "tail-module") == 0) {
assert(tokens[i + j + 1].type == JSMN_PRIMITIVE);
if (val[0] == 't') {
is_tail_module = true;
} else if (val[0] == 'f') {
is_tail_module = false;
} else {
panic("Expected tail_module key to be a JSON boolean, was %s\n", val);
}
// } else if (strcmp(key, "tail-module") == 0) {
// assert(tokens[i + j + 1].type == JSMN_PRIMITIVE);
// if (val[0] == 't') {
// is_tail_module = true;
// } else if (val[0] == 'f') {
// is_tail_module = false;
// } else {
// panic("Expected tail_module key to be a JSON boolean, was %s\n", val);
// }
} else if (strcmp(key, "relative-deadline-us") == 0) {
int64_t buffer = strtoll(val, NULL, 10);
if (buffer < 0 || buffer > (int64_t)RUNTIME_RELATIVE_DEADLINE_US_MAX)
@ -566,28 +601,92 @@ module_new_from_json(char *file_name)
relative_deadline_us, port, request_size, response_size,
admissions_percentile, expected_execution_us);
if (module == NULL) goto module_new_err;
if(next_module_count == 0)
{
module->next_module_names = NULL;
}else
{
module->next_module_names = malloc(next_module_count * sizeof(char*));
if (module->next_module_names == NULL) panic("Failed to allocate memory for next_module_names");
for (int i = 0; i < next_module_count; i++) {
module->next_module_names[i] = strdup(next_module_names[i]);
if (module->next_module_names[i] == NULL) {
panic("Memory allocation failed for next_module_names[%d].\n", i);
}
}
}
module->next_module_count = next_module_count;
module->pre_module_count = pre_module_count;
module->next_module = NULL;
module->pre_module = NULL;
module->runtime_visited = false;
module->run_priority = priority;
#ifdef DEEP_LEARN_SCHDUE
module->mdl.hash_node = 0;
module->mdl.global_queue_node = 0;
module->mdl.http_data_size = 0;
#endif
assert(module);
if (tail_module != NULL) { tail_module->next_module = module; }
tail_module = module;
tail_module->next_module = NULL;
/* if this is the tail module, reset tail_module to NULL to build another new chain */
if (is_tail_module) {
tail_module = NULL;
}
module_set_http_info(module, request_count, request_headers, request_content_type,
response_count, reponse_headers, response_content_type);
nodes[module_count] = module;
module_count++;
}
free(request_headers);
free(reponse_headers);
for (int i = 0; i < next_module_count; i++) {
free(next_module_names[i]);
}
free(next_module_names);
}
if (module_count == 0) panic("%s contained no active modules\n", file_name);
for (int i = 0; i < module_count; i++) {
assert(nodes[i]);
uint32_t next_module_count = nodes[i]->next_module_count;
if (next_module_count == 0) continue;
nodes[i]->next_module = (struct module**) malloc( next_module_count * sizeof(struct module*));
if (nodes[i]->next_module == NULL) panic("Failed to allocate memory for next_module");
for (int next_module_mem = 0; next_module_mem < next_module_count; next_module_mem++) nodes[i]->next_module[next_module_mem] = NULL;
for (int j = 0; j < next_module_count; j++) {
for (int m = i + 1; m < module_count; m++) {
if (strncmp(nodes[i]->next_module_names[j], nodes[m]->name, MODULE_MAX_NAME_LENGTH) == 0) {
assert(nodes[m]);
uint32_t precount = nodes[m]->pre_module_count;
assert(precount);
if (nodes[m]->pre_module == NULL) {
nodes[m]->pre_module = (struct module**) malloc(precount * sizeof(struct module*));
if (nodes[m]->pre_module == NULL) panic("Failed to allocate memory for pre_module");
for (int k = 0; k < precount; k++) {
nodes[m]->pre_module[k] = NULL;
}
}
nodes[i]->next_module[j] = nodes[m];
int preflag = 0;
for (; preflag < precount; preflag++) {
if(nodes[m]->pre_module[preflag] == NULL) break;
}
nodes[m]->pre_module[preflag] = nodes[i];
break;
}
}
}
}
/*Avoid module memory copy overhead*/
for(int i = 0; i < module_count; i++) {
for (int j = 0; j < nodes[i]->next_module_count; j++)
{
free(nodes[i]->next_module_names[j]);
}
nodes[i]->next_module_names = NULL;
}
free(nodes);
nodes = NULL;
#ifdef LOG_MODULE_LOADING
debuglog("Loaded %d module%s!\n", module_count, module_count > 1 ? "s" : "");
#endif

@ -33,6 +33,7 @@ int runtime_worker_threads_argument[RUNTIME_WORKER_THREAD_CORE_COUNT] = {
/* The active deadline of the sandbox running on each worker thread */
uint64_t runtime_worker_threads_deadline[RUNTIME_WORKER_THREAD_CORE_COUNT] = { UINT64_MAX };
uint64_t runtime_worker_threads_remaining_slack[RUNTIME_WORKER_THREAD_CORE_COUNT] = { UINT64_MAX };
uint64_t runtime_worker_threads_laxity[RUNTIME_WORKER_THREAD_CORE_COUNT] = { UINT64_MAX };
/******************************************
* Shared Process / Listener Thread Logic *
@ -41,7 +42,7 @@ uint64_t runtime_worker_threads_remaining_slack[RUNTIME_WORKER_THREAD_CORE_COUNT
void
runtime_cleanup()
{
if (runtime_sandbox_perf_log != NULL) fflush(runtime_sandbox_perf_log);
//if (runtime_sandbox_perf_log != NULL) fflush(runtime_sandbox_perf_log);
software_interrupt_deferred_sigalrm_max_print();
exit(EXIT_SUCCESS);

@ -151,7 +151,6 @@ sandbox_allocate(struct sandbox_request *sandbox_request)
/* Set state to initializing */
sandbox_set_as_initialized(sandbox, sandbox_request, now);
free(sandbox_request);
done:
return sandbox;
@ -186,6 +185,7 @@ sandbox_free(struct sandbox *sandbox)
assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE);
int rc;
//printf("ID %lu of sandbox %s removing sandbox from queue\n",sandbox->id, sandbox->module->name);
if (sandbox->previous_function_output != NULL) {
free(sandbox->previous_function_output);
sandbox->previous_function_output = NULL;

@ -94,6 +94,16 @@ sigalrm_propagate_workers(siginfo_t *signal_info)
uint64_t global_slack = global_request_scheduler_peek();
if (global_slack < local_remaining_slack) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
} else if (scheduler == SCHEDULER_MDL) {
uint64_t local_remaining_slack = runtime_worker_threads_remaining_slack[i];
uint64_t global_slack = global_request_scheduler_peek();
if (global_slack < local_remaining_slack) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
} else if (scheduler == SCHEDULER_LLF) {
uint64_t local_Laxity = runtime_worker_threads_laxity[i];
uint64_t global_Laxity = global_request_scheduler_peek();
if (global_Laxity < local_Laxity) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
}
}
case RUNTIME_SIGALRM_HANDLER_BROADCAST: {
@ -253,12 +263,12 @@ done:
void
software_interrupt_arm_timer(void)
{
if (!runtime_preemption_enabled) return;
/* if preemption disabled, broadcast sig alarm to all other threads to record the queuelength info */
/* if preemption disabled, broadcast sig alarm to all other threads to record the queuelength info */
if (!runtime_preemption_enabled) {
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
return;
}
struct itimerval interval_timer;
memset(&interval_timer, 0, sizeof(struct itimerval));

@ -2,16 +2,17 @@ include Makefile.inc
#TESTS=fibonacci fibonacci2 fibonacci3 big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS=fibonacci big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS2=fibonacciadd mem work3 picinpic noop fibonacci2 fibchain
TESTSRT=$(TESTS:%=%_rt)
.PHONY: all clean rttests tinyekf cifar10 gocr sod
TESTSRT2=$(TESTS2:%=%_rt)
.PHONY: all clean rttests tinyekf cifar10 gocr sod add
all: rttests tinyekf cifar10 gocr sod
@echo "Test Compilation done!"
rttests: $(TESTSRT)
add: $(TESTSRT2)
clean:
@echo "Cleaning Test Applications"
@ -42,6 +43,7 @@ sod:
@make dir samples.so -C ./sod/
@cp ./sod/bin/license_plate_detection.so ${SLEDGE_BIN_DIR}/lpd_wasm.so
@cp ./sod/bin/resize_image.so ${SLEDGE_BIN_DIR}/resize_wasm.so
# @cp ./sod/bin/reverse.so ${SLEDGE_BIN_DIR}/reverse_wasm.so
C-Image-Manip:
@echo "Making and Installing pngPlay"

File diff suppressed because one or more lines are too long

@ -0,0 +1 @@
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

@ -0,0 +1,15 @@
#!/bin/bash
function usage {
echo "$0 [cpu-log]"
exit 1
}
path="/home/hai/sledge-old/runtime/tests"
#test work1k 2000
server_log_file="edf_1k.log"
$path/compare/start_compare.sh $server_log_file >/dev/null 2>&1 &
echo "sledge is running"
./hey_test_compare.sh 60 22
$path/kill_sledge.sh

File diff suppressed because one or more lines are too long

@ -0,0 +1,24 @@
function usage {
echo "$0 [duration(s)] [concurrency]"
exit 1
}
if [ $# != 2 ] ; then
usage
exit 1;
fi
duration=$1
concurrency=$2
f1="1byte_"$concurrency".txt"
echo $f1
f2="0.4k_"$concurrency".txt"
./test_8c.sh $f1 $duration $concurrency 1byte_file 10000 2>&1 &
pid1=$!
./test_8c.sh $f2 $duration $concurrency 410byte_file 10004 2>&1 &
pid4=$!
wait -f $pid1
wait -f $pid2
printf "[OK]\n"

@ -0,0 +1,29 @@
#!/bin/bash
function usage {
echo "$0 [perf output file, chain_function_perf.log or single_function_perf.log or opt_function_perf.log]"
exit 1
}
if [ $# != 1 ] ; then
usage
exit 1;
fi
output=$1
declare project_path="$(
cd "$(dirname "$0")/../../.."
pwd
)"
echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2400
export SLEDGE_SCHEDULER=EDF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/compare/test_work1k.json

@ -0,0 +1,22 @@
function usage {
echo "$0 [output file] [duration(s)] [concurrency] [image file] [port]"
exit 1
}
if [ $# != 5 ] ; then
echo "input parameters are not 5"
usage
exit 1;
fi
output=$1
duration=$2
concurrency=$3
image=$4
port=$5
echo "hey -disable-compression -disable-keepalive -disable-redirects -c $concurrency -z $duration\s -t 0 -m GET -D "$image" "http://127.0.0.1:$port" > $output"
#hey -disable-compression -disable-keepalive -disable-redirects -c 1 -q $rps -z $duration\s -cpus 1 -t 0 -m GET -D "$image" "http://10.10.1.1:$port"
#hey -disable-compression -disable-keepalive -disable-redirects -c $concurrency -z 20s -t 0 -m GET -D "$image" "http://10.10.1.1:$port"
hey -disable-compression -disable-keepalive -disable-redirects -n 5000 -c $concurrency -z $duration\s -t 0 -m GET -D "$image" "http://127.0.0.1:$port" > $output

@ -0,0 +1,136 @@
{
"active": true,
"name": "work1k1_1",
"path": "work1k_wasm.so",
"port": 10000,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work1k1_2", "work1k1_3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k1_2",
"path": "work1k_wasm.so",
"port": 10001,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work1k1_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k1_3",
"path": "work1k_wasm.so",
"port": 10002,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 2,
"pre_module_count": 1,
"next_modules": ["work1k1_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k1_4",
"path": "work1k_wasm.so",
"port": 10003,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain",
},
{
"active": true,
"name": "work1k2_1",
"path": "work1k_wasm.so",
"port": 10004,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work1k2_2", "work1k2_3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k2_2",
"path": "work1k_wasm.so",
"port": 10005,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work1k2_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k2_3",
"path": "work1k_wasm.so",
"port": 10006,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 2,
"pre_module_count": 1,
"next_modules": ["work1k2_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k2_4",
"path": "work1k_wasm.so",
"port": 10007,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plaini",
},

@ -0,0 +1,85 @@
{
"active": true,
"name": "fibona1",
"path": "fibchain_wasm.so",
"port": 10000,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["fibona2"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona2",
"path": "fibchain_wasm.so",
"port": 10002,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona3",
"path": "fibchain_wasm.so",
"port": 10003,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona4",
"path": "fibchain_wasm.so",
"port": 10004,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona5"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona5",
"path": "fibchain_wasm.so",
"port": 10005,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}

@ -0,0 +1,20 @@
#include <stdio.h>
#include <stdlib.h>
unsigned long int
fib(unsigned long int n)
{
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
int
main(int argc, char **argv)
{
unsigned long n = 0;
scanf("%lu", &n);
n= fib(n);
printf("%lu\n", n);
return 0;
}

@ -1,40 +1,30 @@
#include <stdio.h>
#include <stdlib.h>
// #include "get_time.h"
unsigned long int
fib(unsigned long int n)
{
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
/*
int
main(int argc, char **argv)
{
unsigned long r = 0;
//scanf("%s", recv_buf);
r = fib(30);
printf("%lu\n", r);
return 0;
}*/
int
main(int argc, char **argv)
{
//char * recv_buf = malloc(1024 * 1024);
char recv_buf[1024 * 1024] = {0};
//memset(recv_buf, 0, 1024 * 1024);
unsigned long r = 0;
//scanf("%s", recv_buf);
r = read(0, recv_buf, 1024 * 1024);
unsigned long n = 0;
scanf("%lu", &n);
//size_t rd = read(0, recv_buf, 1000*1024);
//if (rd <= 0) return -1;
// unsigned long long st = get_time(), en;
//r = fib(30);
// en = get_time();
printf("%lu\n", r);
for (int i = 0; i < 3; i++)
{
n = fib(n);
}
printf("%lu\n", n);
// print_time(st, en);
return 0;
}

@ -0,0 +1,99 @@
#include "dag_data_split.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct DataNode {
uint32_t dataLength;
char *data;
struct DataNode *next;
};
DataNode* splitData(char *buffer, uint32_t bufferSize) {
DataNode *head = NULL;
DataNode *tail = NULL;
uint32_t offset = 0;
while (offset < bufferSize) {
if (offset + 4 > bufferSize) {
break;
}
uint32_t dataLength = *(uint32_t *)(buffer + offset);
offset += 4;
if (offset + dataLength > bufferSize) {
break;
}
DataNode *newNode = (DataNode *)malloc(sizeof(DataNode));
if (newNode == NULL) {
perror("Memory allocation failed");
freeDataNodes(head); // 释放已分配的节点内存
return NULL;
}
newNode->data = (char *)malloc(dataLength);
if (newNode->data == NULL) {
free(newNode);
perror("Memory allocation failed");
freeDataNodes(head); // 释放已分配的节点内存
return NULL;
}
memcpy(newNode->data, buffer + offset, dataLength);
newNode->dataLength = dataLength;
newNode->next = NULL;
if (head == NULL) {
head = newNode;
} else {
tail->next = newNode;
}
tail = newNode;
offset += dataLength;
}
return head;
}
void freeDataNodes(DataNode *head) {
while (head != NULL) {
DataNode *next = head->next;
free(head->data);
free(head);
head = next;
}
}
void printDataList(DataNode *head) {
int index = 0;
DataNode *current = head;
while (current != NULL) {
printf("Data %d: Length = %u\n", index, current->dataLength);
index++;
current = current->next;
}
}
int getDataNodeCount(DataNode *head) {
int count = 0;
DataNode *current = head;
while (current != NULL) {
count++;
current = current->next;
}
return count;
}
const char* getDataNodeByIndex(DataNode *head, int index) {
int count = 1;
DataNode *current = head;
while (current != NULL) {
if (count == index) {
return current->data;
}
count++;
current = current->next;
}
return NULL; // 如果索引超出范围返回NULL
}

@ -0,0 +1,17 @@
#pragma once
#include <stdint.h>
typedef struct DataNode DataNode;
DataNode* splitData(char *buffer, uint32_t bufferSize);
void freeDataNodes(DataNode *head);
void printDataList(DataNode *head);
int getDataNodeCount(DataNode *head);
/**
* @param index is form 1 to n
*/
const char* getDataNodeByIndex(DataNode *head, int index);

@ -0,0 +1,59 @@
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include "dag_data_split.h"
unsigned long int fib(unsigned long int n) {
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
int main() {
char buffer[1024]; // Buffer to store input data
ssize_t bytes_read = read(0, buffer, sizeof(buffer));
if (bytes_read < 0) {
perror("Error reading input");
return 1;
}
DataNode *dataList = splitData(buffer, bytes_read);
if (dataList == NULL) {
fprintf(stderr, "Failed to split data.\n");
return 1;
}
unsigned long int num1, num2;
const char *firstdata = getDataNodeByIndex(dataList, 1);
const char *seconddata = getDataNodeByIndex(dataList, 2);
if (firstdata == NULL || seconddata == NULL) {
fprintf(stderr, "Not enough data for two numbers.\n");
freeDataNodes(dataList);
return 1;
}
if (sscanf(firstdata, "%lu", &num1) != 1 || sscanf(seconddata, "%lu", &num2) != 1) {
fprintf(stderr, "Failed to parse the numbers correctly.\n");
freeDataNodes(dataList);
return 1;
}
unsigned long int fib1 = fib(num1);
unsigned long int fib2 = fib(num2);
unsigned long int sum = fib1 + fib2;
// Prepare output string
char output[1024];
int len = snprintf(output, sizeof(output), "Fibonacci(%lu) + Fibonacci(%lu) = %lu + %lu = %lu\n", num1, num2, fib1, fib2, sum);
// Write to stdout
write(1, output, len);
// <20><><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>
freeDataNodes(dataList);
return 0;
}

@ -0,0 +1,17 @@
{
"active": true,
"name": "fibona1",
"path": "fibonacci2_wasm.so",
"port": 10000,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}

@ -0,0 +1,68 @@
{
"active": true,
"name": "work",
"path": "work_wasm.so",
"port": 10000,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work2", "work3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work2",
"path": "work_wasm.so",
"port": 10001,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work3",
"path": "work_wasm.so",
"port": 10002,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work4",
"path": "fibonacciadd_wasm.so",
"port": 10003,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
}

@ -25,9 +25,9 @@ pid2=$!
pid3=$!
./test_8c.sh $f4 $duration $concurrency 40k.jpg 10009 2>&1 &
pid4=$!
wait -f $pid1
wait -f $pid2
wait -f $pid3
wait -f $pid4
wait $pid1
wait $pid2
wait $pid3
wait $pid4
printf "[OK]\n"

@ -0,0 +1,46 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#define OUTPUT_BUFFER_SIZE (1024 * 5) // 5KB
int main() {
char *data = (char *)malloc(10 * 1024); // Allocate 10KB
if (!data) {
fprintf(stderr, "Failed to allocate memory.\n");
return 1;
}
ssize_t bytesRead = read(STDIN_FILENO, data, 10 * 1024);
if (bytesRead == -1) {
fprintf(stderr, "Failed to read data from file.\n");
free(data);
return 1;
}
// Create a combined buffer to hold the output
char *combined_output = malloc(2 * OUTPUT_BUFFER_SIZE + 50); // 50 extra bytes for text and safety
if (!combined_output) {
fprintf(stderr, "Failed to allocate memory for combined output.\n");
free(data);
return 1;
}
// Format the output in a single buffer
int offset = sprintf(combined_output, "Buffer 1 (First 5KB): ");
memcpy(combined_output + offset, data, OUTPUT_BUFFER_SIZE);
offset += OUTPUT_BUFFER_SIZE;
offset += sprintf(combined_output + offset, " Buffer 2 (Second 5KB): ");
memcpy(combined_output + offset, data + OUTPUT_BUFFER_SIZE, OUTPUT_BUFFER_SIZE);
// Print everything in one go
fwrite(combined_output, sizeof(char), offset + OUTPUT_BUFFER_SIZE, stdout);
printf("\n");
// Clean up
free(data);
free(combined_output);
return 0;
}

@ -0,0 +1,26 @@
import sys
def process_file(input_file, noop_functions):
data = {noop: [] for noop in noop_functions}
with open(input_file, "r") as infile:
for line in infile:
for noop in noop_functions:
if noop in line:
value = line.split(",")[6]
data[noop].append(value)
for noop, values in data.items():
with open(f"{noop}.txt", "w") as outfile:
outfile.write("\n".join(values))
if __name__ == "__main__":
noop_functions = ["noop1", "noop2", "noop3", "noop4", "noop5"]
argv = sys.argv[1:]
if len(argv) < 1:
print("usage:", sys.argv[0], "file_dir percentage")
sys.exit()
input_file = argv[0]
process_file(input_file, noop_functions)

@ -0,0 +1,11 @@
#include <stdio.h>
void noop() {
}
int main() {
noop();
return 0;
}

@ -0,0 +1,22 @@
import os
def calculate_average(filename):
with open(filename, "r") as file:
values = file.readlines()
values = [int(value.strip()) for value in values]
average = sum(values) / len(values) if values else 0
return average
def main():
noop_functions = ["noop1", "noop2", "noop3", "noop4", "noop5"]
for noop in noop_functions:
filename = f"{noop}.txt"
if os.path.exists(filename):
average = calculate_average(filename)
print(f"Average for {filename}: {average}")
else:
print(f"{filename} does not exist.")
if __name__ == "__main__":
main()

@ -36,7 +36,7 @@ def get_values(key, value, miss_deadline_rate, total_latency, running_times, pre
before_dot = file_name.split(".")[0]
joint_f_name = before_dot + "_total_time.txt"
cmd='python3 ~/sledge-serverless-framework/runtime/tests/meet_deadline_percentage.py %s 50' % file_name
cmd='python3 /home/hai/sledge/sledge/runtime/tests/meet_deadline_percentage.py %s 50' % file_name
rt=os.popen(cmd).read().strip()
cmd2='mv total_time.txt %s' % joint_f_name
os.popen(cmd2)

@ -0,0 +1,111 @@
#include "dag_split_image.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct DataNode {
uint32_t dataLength;
unsigned char *data;
struct DataNode *next;
};
DataNode* splitData(unsigned char *buffer, uint32_t bufferSize) {
DataNode *head = NULL;
DataNode *tail = NULL;
uint32_t offset = 0;
while (offset < bufferSize) {
if (offset + 4 > bufferSize) {
break;
}
uint32_t dataLength = *(uint32_t *)(buffer + offset);
offset += 4;
if (offset + dataLength > bufferSize) {
break;
}
DataNode *newNode = (DataNode *)malloc(sizeof(DataNode));
if (newNode == NULL) {
perror("Memory allocation failed");
freeDataNodes(head);
return NULL;
}
newNode->data = (unsigned char *)malloc(dataLength);
if (newNode->data == NULL) {
free(newNode);
perror("Memory allocation failed");
freeDataNodes(head);
return NULL;
}
memcpy(newNode->data, buffer + offset, dataLength);
newNode->dataLength = dataLength;
newNode->next = NULL;
if (head == NULL) {
head = newNode;
} else {
tail->next = newNode;
}
tail = newNode;
offset += dataLength;
}
return head;
}
void freeDataNodes(DataNode *head) {
while (head != NULL) {
DataNode *next = head->next;
free(head->data);
free(head);
head = next;
}
}
void printDataList(DataNode *head) {
int index = 0;
DataNode *current = head;
while (current != NULL) {
printf("Data %d: Length = %u\n", index, current->dataLength);
index++;
current = current->next;
}
}
int getDataNodeCount(DataNode *head) {
int count = 0;
DataNode *current = head;
while (current != NULL) {
count++;
current = current->next;
}
return count;
}
unsigned char* getDataNodeByIndex(DataNode *head, int index) {
int count = 1;
DataNode *current = head;
while (current != NULL) {
if (count == index) {
return current->data;
}
count++;
current = current->next;
}
return NULL;
}
uint32_t getImageDataSize(DataNode *head, int index){
int count = 1;
DataNode *current = head;
while (current != NULL) {
if (count == index) {
return current->dataLength;
}
count++;
current = current->next;
}
return 0;
}

@ -0,0 +1,18 @@
#pragma once
#include <stdint.h>
typedef struct DataNode DataNode;
DataNode* splitData(unsigned char *buffer, uint32_t bufferSize);
void freeDataNodes(DataNode *head);
void printDataList(DataNode *head);
int getDataNodeCount(DataNode *head);
/**
* @param index is form 1 to n
*/
unsigned char* getDataNodeByIndex(DataNode *head, int index);
uint32_t getImageDataSize(DataNode *head, int index);

@ -0,0 +1,79 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "dag_split_image.h"
#define MAX_IMG_SZ 8*1025*1024
int main()
{
unsigned char *zInpbuf = NULL;
zInpbuf = malloc(MAX_IMG_SZ);
if (!zInpbuf)
{
perror("malloc");
return -1;
}
ssize_t imgSz = read(0, zInpbuf, MAX_IMG_SZ);
if (imgSz <= 0)
{
perror("read");
free(zInpbuf);
return -1;
}
DataNode *dataList = splitData(zInpbuf, imgSz);
if (dataList == NULL) {
fprintf(stderr, "Failed to split data.\n");
return 1;
}
unsigned char *Imagedata1 = getDataNodeByIndex(dataList, 1);
unsigned char *Imagedata2 = getDataNodeByIndex(dataList, 2);
uint32_t imageSize1 = getImageDataSize(dataList, 1);
uint32_t imageSize2 = getImageDataSize(dataList, 2);
int x, y;
FILE *out_bin = stdout;
for (int i = 0; i < 54; i++)
{
x = Imagedata1[i];
y = (i < imageSize2) ? Imagedata2[i] : x;
fwrite(&x, 1, 1, out_bin);
}
int i = 0;
unsigned long big_img_offset = 54;
unsigned long small_img_offset = 54;
while (big_img_offset < imageSize1)
{
if (i == 2400) i = 0;
x = Imagedata1[big_img_offset++];
if (i < 300 && small_img_offset < imageSize2)
{
y = Imagedata2[small_img_offset++];
fwrite(&y, 1, 1, out_bin);
}
else
{
fwrite(&x, 1, 1, out_bin);
}
i++;
}
while (big_img_offset < imageSize1)
{
x = Imagedata1[big_img_offset++];
fwrite(&x, 1, 1, out_bin);
}
free(zInpbuf);
return 0;
}

@ -2703,7 +2703,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.1.undefined"
}
},
"nbformat": 4,

@ -1 +1,2 @@
sudo chsh -s /bin/bash xiaosuGW
sudo chsh -s /bin/bash hai

@ -20,10 +20,10 @@ declare project_path="$(
echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=3300
#export SLEDGE_SCHEDULER=SRSF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_CPU_SPEED=3300
export SLEDGE_SCHEDULER=EDF
#export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=1
export SLEDGE_SCHEDULER=EDF
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
@ -37,7 +37,8 @@ cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/mulitple_linear_chain.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing3.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing4.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing4.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json

@ -21,23 +21,19 @@ echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2400
export SLEDGE_SCHEDULER=SRSF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SCHEDULER=EDF
export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=1
#export SLEDGE_SCHEDULER=EDF
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_big_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_armcifar10.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_png2bmp.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_image_processing.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/mulitple_linear_chain.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing3.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing4.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph2.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/fibc.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_dag_image.json

@ -0,0 +1,32 @@
#!/bin/bash
function usage {
echo "$0 [perf output file, chain_function_perf.log or single_function_perf.log or opt_function_perf.log]"
exit 1
}
if [ $# != 1 ] ; then
usage
exit 1;
fi
output=$1
declare project_path="$(
cd "$(dirname "$0")/../.."
pwd
)"
echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2500
export SLEDGE_SCHEDULER=FIFO
#xport SLEDGE_SIGALRM_HANDLER=BROADCAST
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=16
#export SLEDGE_SCHEDULER=EDF
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_noop1.json

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save