Compare commits

...

23 Commits

Author SHA1 Message Date
hwwang 7da0f72179 修复关于初始化时间的bug
8 months ago
hwwang ed80c30ccf 修复链式中数据流二次拷贝的问题,以及节点扇出数据二次拷贝的问题
9 months ago
hwwang 8990f5c5ed 添加机器学习框架数据获取代码,主要是获取TCP套接字报文大小
9 months ago
hwwang 87672b74c4 添加LLF调度 修改SRSF调度细节
9 months ago
hwwang 87ba8af8a0 fix time bug
9 months ago
hwwang a696655ad5 添加线性拟合计算剩余时间,并设置model_base,当预估时间超过base时间的1.2倍,改为base时间
9 months ago
hwwang 10d9659172 添加机器学习调度框架,考虑如何训练数据
9 months ago
hwwang 97b158f508 添加测试用例
11 months ago
hwwang a66ea1bc25 修改打印的bug,添加图片测试脚本
11 months ago
hwwang 4b2a905e92 为扇入节点添加处理数据的api,格式是四字节数据长度加数据
11 months ago
hwwang a056d25ff4 修改节点扇入时,不用&做分割符,改为增加前四字节作为数据长度
11 months ago
hwwang 46b4325dba 添加图测试文件
12 months ago
hwwang 8eae02580f 添加图测试文件
12 months ago
hwwang 5711145b3b 修改hash表和sandbox_request的互斥锁为读写锁,但现在觉得map不加锁可能会有隐患
12 months ago
hwwang 1854297530 将沙盒输出挂载在一个链表中,修改锁的范围,但是感觉结果不是很好
1 year ago
hwwang bae018314f 修改数据拷贝方式,关于50%的响应获取问题还有待商榷
1 year ago
hwwang 0f9b75d784 asd
1 year ago
hwwang 8ae955fbfc 修改内存释放错误的bug
1 year ago
hwwang 702f5de60d 为hash表添加锁
1 year ago
hwwang 0b1fb6c5d7 修一部分logbug
1 year ago
hwwang 7abc07445d 修改log参数,未修改完
1 year ago
hwwang 3aeed6f94b 已添加DAG,需要修改各部份时间测试代码
1 year ago
hwwang 70fdbb348c need modify thirdparty hashmap
1 year ago

@ -1,3 +1,3 @@
LD_LIBRARY_PATH=/home/hai/sledge-serverless-framework/runtime/bin
SLEDGE_SCHEDULER=EDF
SLEDGE_SANDBOX_PERF_LOG=/home/hai/sledge-serverless-framework/debuglog.txt
LD_LIBRARY_PATH=/home/hai/sledge/sledge/runtime/bin
SLEDGE_SCHEDULER=MDL
SLEDGE_SANDBOX_PERF_LOG=/home/hai/sledge/sledge/runtime/tests/runtime_sandbox_perf_log.log

5
.gitignore vendored

@ -56,6 +56,11 @@ runtime/tests/tmp/
runtime/tests/**/*.csv
runtime/tests/**/*.txt
runtime/tests/**/*.xlsx
runtime/tests/test_data
runtime/tests/*.log
runtime/data/*.txt
runtime/data/*.log
runtime/data/*.json
# Swap Files
*.swp

7
.gitmodules vendored

@ -11,6 +11,9 @@ url = https://github.com/gwsystems/ck.git
[submodule "jsmn"]
path = runtime/thirdparty/jsmn
url = https://github.com/gwsystems/jsmn.git
[submodule "hash"]
path = runtime/thirdparty/hashmap
url = https://github.com/tidwall/hashmap.c.git
[submodule "runtime/tests/gocr"]
path = runtime/tests/gocr
url = https://github.com/gwsystems/gocr.git
@ -25,8 +28,8 @@ url = https://github.com/gwsystems/CMSIS_5_NN.git
branch = sledge
[submodule "runtime/tests/sod"]
path = runtime/tests/sod
url = https://github.com/gwsystems/sod.git
branch = sledge
url = http://47.120.57.226:3000/haiwan/Sod.git
branch = main
[submodule "runtime/tests/speechtotext"]
path = runtime/tests/speechtotext
url = https://github.com/gwsystems/speechtotext.git

@ -9,7 +9,9 @@
"defines": [
"USE_MEM_VM",
"x86_64",
"_GNU_SOURCE"
"_GNU_SOURCE",
"LOG_TO_FILE",
"DEEP_LEARN_SCHDUE"
],
"cStandard": "${default}",
"compilerPath": "/usr/bin/clang",

162
.vscode/launch.json vendored

@ -1,52 +1,114 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Hyde",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/applications/ocr/hyde/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "Preemption",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/preemption/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
"version": "0.2.0",
"configurations": [
{
"name": "Hyde",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/applications/ocr/hyde/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "Preemption",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/experiments/preemption/spec.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "graph",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/bin/sledgert",
"args": [
"${workspaceFolder}/runtime/tests/graph.json"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"sourceFileMap": {
"/sledge/runtime": "${workspaceFolder}/runtime"
},
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "utest",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/utest/map",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"sourceFileMap": {},
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "C/C++ Runner: Debug Session",
"type": "cppdbg",
"request": "launch",
"args": [],
"stopAtEntry": false,
"externalConsole": false,
"cwd": "/home/weihao/sledge/sledge_tree/runtime/tests/noop",
"program": "/home/weihao/sledge/sledge_tree/runtime/tests/noop/build/Debug/outDebug",
"MIMode": "gdb",
"miDebuggerPath": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
}

@ -95,7 +95,26 @@
"compare": "c",
"cstdint": "c",
"format": "c",
"jsmn.h": "c"
"jsmn.h": "c",
"__bit_reference": "c",
"algorithm": "c",
"lock_time.h": "c",
"mcs.h": "c",
"map.h": "c",
"stdio.h": "c",
"hashmap.h": "c",
"mutex": "cpp",
"xmalloc.h": "c",
"stddef.h": "c",
"__mutex_base": "c",
"memory": "c",
"atomic": "c",
"condition_variable": "c",
"ostream": "c",
"stop_token": "c",
"dag_data_split.h": "c",
"scheduler.h": "c",
"priority_queue.h": "c"
},
"files.exclude": {
"**/.git": true,
@ -144,6 +163,7 @@
"**/runtime/thirdparty/http-parser/**": true,
"**/runtime/thirdparty/jsmn/**": true,
"**/runtime/thirdparty/dist/**": true,
"**/runtime/thirdparty/hashmap/**": true,
"*.o": true,
"*.bc": true,
"*.wasm": true

@ -0,0 +1 @@
12

@ -41,6 +41,14 @@ BINARY_NAME=sledgert
# Feature Toggles
# CFLAGS += -DADMISSIONS_CONTROL
# This definition is used when the module is triggered by an HTTP request.
# It retrieves the length of the HTTP message data, the hash table, and the global queue length via a TCP socket.
# These values are then used as features for machine learning training
# CFLAGS += -DDEEP_LEARN_SCHDUE
# This definition determines whether to use the median or the mean to calculate the execution times of the first 256 services.
# CFLAGS += -DGET_AVER_TIME
# Debugging Flags
# Strips out calls to assert() and disables debuglog
@ -61,6 +69,9 @@ CFLAGS += -DLOG_TO_FILE
# CFLAGS += -DOPT_AVOID_GLOBAL_QUEUE
# CFLAGS += -DLOG_RUNTIME_FILE_LOG
CFLAGS += -DLOG_RUNTIME_MEM_LOG
# For machine learning purposes, the following data is collected and saved:
# Length of the HTTP message Hash table Global queue length Service execution time
# CFLAGS += -DLOG_DEEP_LEARN_SCHDUE
# This dumps per module *.csv files containing the cycle a sandbox has been in RUNNING when each
# page is allocated. This helps understand the relationship to memory allocation and execution time.
@ -150,6 +161,7 @@ fetch:
@git config --global --add safe.directory /sledge/runtime/tests/speechtotext/thirdparty/sphinxbase
@git config --global --add safe.directory /sledge/runtime/thirdparty/http-parser
@git config --global --add safe.directory /sledge/runtime/thirdparty/jsmn
@git config --global --add safe.directory /sledge/runtime/thirdparty/hashmap
@git config --global --add safe.directory /sledge/runtime/tests/C-Image-Manip
@git submodule update --init --recursive

@ -0,0 +1,30 @@
import sys
def calculate_average(input_file, column_index):
total = 0
count = 0
with open(input_file, 'r') as f:
for line in f:
columns = line.strip().split(',')
if len(columns) > column_index:
try:
value = float(columns[column_index])
total += value
count += 1
except ValueError:
print(f"error value: {columns[column_index]}")
if count > 0:
average = total / count
print(f"list {column_index + 1} average: {average}")
else:
print("no value")
if __name__ == "__main__":
if len(sys.argv) != 3:
print(" python calculate_average.py input_file column_index")
else:
input_file = sys.argv[1]
column_index = int(sys.argv[2]) - 1
calculate_average(input_file, column_index)

@ -0,0 +1,41 @@
# -*- coding: UTF-8 -*-
import matplotlib.pyplot as plt
def plot_max_rps_comparison():
# Data from the previous description
sizes = ['5KB', '40KB', '105KB', '305KB']
max_rps_before = [121, 85, 63, 33]
max_rps_after = [607, 235, 128, 45]
# Index for each bar position along the x-axis
index = range(len(sizes))
bar_width = 0.35 # width of the bars
# Setting up the plot
fig, ax = plt.subplots()
ax.set_facecolor('#f0f0f0')
# Creating bars for "Before Optimization"
rects1 = ax.bar(index, max_rps_before, bar_width, label='Mixed Task', color='orange')
# Creating bars for "After Optimization" shifted to the right by `bar_width`
rects2 = ax.bar([p + bar_width for p in index], max_rps_after, bar_width, label='Single Task', color='skyblue')
# Adding labels and title
ax.set_xlabel('Image Size(KB)')
ax.set_ylabel('MAX RPS')
ax.set_title('Mixed and Single Task Performance Comparison')
# Setting the position of the x-ticks to be in the middle of the grouped bars
ax.set_xticks([p + bar_width / 2 for p in index])
ax.set_xticklabels(sizes)
# Adding a legend to explain which bars represent before and after optimization
ax.legend()
# Displaying the plot
plt.show()
# Call the function to display the plot
plot_max_rps_comparison()

@ -0,0 +1,51 @@
import matplotlib.pyplot as plt
from matplotlib import font_manager as fm
def load_data(filename):
"""
Load data from a file.
Assumes that each line in the file is a float value.
"""
with open(filename, 'r') as file:
data = file.read().split()
data = [float(i) for i in data]
return data
def main():
# 加载数据
edf_data = [0.83, 1.35, 1.88, 2.36, 1.9]
llf_data = [0.45, 0.4, 0.52, 0.97, 0.9]
# 设置X轴的数据点
x_labels = [50, 60, 70, 80, 90] # 确保数据与这些标签相匹配
font_properties = fm.FontProperties(family='Times New Roman', size=18)
plt.rcParams.update({'font.size': 18, 'font.family': 'Times New Roman'})
# 创建图形和绘制数据
plt.figure(figsize=(10, 5))
ax = plt.gca() # 获取当前的Axes对象ax
ax.set_facecolor('#f0f0f0') # 设置浅灰色背景
plt.plot(x_labels, edf_data, marker='s', linestyle='-', color='#C8503D', markersize=8, label='EDF')
plt.plot(x_labels, llf_data, marker='^', linestyle='-', color='#00008B', markersize=8, label='LLF')
# 添加标题、标签和图例
plt.title('5KB-1.5*Deadline', fontsize=20, fontproperties=font_properties)
plt.xlabel('Load (% of maximum RPS)', fontproperties=font_properties)
plt.ylabel('Deadline Miss Rate (%)', fontproperties=font_properties)
plt.legend(prop=font_properties)
# 设置X轴刻度
plt.xticks(range(50, 91, 10))
# 设置网格
plt.grid(True)
# 移除边框的四边刻度线
plt.tick_params(axis='both', which='both', length=0) # 移除刻度线
# 显示图形
plt.show()
if __name__ == "__main__":
main()

@ -0,0 +1,24 @@
# split_logs.py
def split_logs(input_file):
modules = {
"resize1": [],
"png2bmp1": [],
"lpd_wasm1": [],
"cifar10_1": [],
"work1": []
}
with open(input_file, 'r') as f:
for line in f:
for module in modules.keys():
if module in line:
modules[module].append(line.strip())
break
for module, entries in modules.items():
with open(f"{module}.txt", 'w') as outfile:
outfile.write("\n".join(entries) + "\n")
if __name__ == "__main__":
split_logs("sledge.log")

@ -0,0 +1,20 @@
import sys
def split_columns(input_file):
columns = []
with open(input_file, 'r') as f:
for line in f:
parts = line.strip().split(',')
for i, part in enumerate(parts):
if len(columns) <= i:
columns.append([])
columns[i].append(part)
for i, column in enumerate(columns):
with open(f"{input_file[:-4]}_column_{i + 1}.txt", 'w') as outfile:
outfile.write("\n".join(column) + "\n")
if __name__ == "__main__":
for input_file in sys.argv[1:]:
split_columns(input_file)

File diff suppressed because one or more lines are too long

@ -15,4 +15,5 @@ void admissions_info_initialize(struct admissions_info *self, char* module_name,
void admissions_info_update(struct admissions_info *self, uint64_t execution_duration);
uint64_t admission_info_get_percentile(struct admissions_info *self);
uint64_t admission_info_get_average(struct admissions_info *self);

@ -9,12 +9,14 @@ typedef struct sandbox_request *(*global_request_scheduler_add_fn_t)(void *);
typedef int (*global_request_scheduler_remove_fn_t)(struct sandbox_request **);
typedef int (*global_request_scheduler_remove_if_earlier_fn_t)(struct sandbox_request **, uint64_t);
typedef uint64_t (*global_request_scheduler_peek_fn_t)(void);
typedef int (*global_request_scheduler_size_fn_t)(void);
struct global_request_scheduler_config {
global_request_scheduler_add_fn_t add_fn;
global_request_scheduler_remove_fn_t remove_fn;
global_request_scheduler_remove_if_earlier_fn_t remove_if_earlier_fn;
global_request_scheduler_peek_fn_t peek_fn;
global_request_scheduler_size_fn_t size_fn;
};
@ -23,3 +25,4 @@ struct sandbox_request *global_request_scheduler_add(struct sandbox_request *);
int global_request_scheduler_remove(struct sandbox_request **);
int global_request_scheduler_remove_if_earlier(struct sandbox_request **, uint64_t targed_deadline);
uint64_t global_request_scheduler_peek(void);
int global_request_scheduler_size(void);

@ -0,0 +1,180 @@
#pragma once
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include "xmalloc.h"
/* Simple K-V store based on The Practice of Programming by Kernighan and Pike */
/* Bucket count is sized to be a prime that is approximately 20% larger than the desired capacity (6k keys) */
#define MAP_BUCKET_COUNT 7907
#define MAP_HASH jenkins_hash
struct map_node {
struct map_node *next;
char *key;
void *value;
uint32_t key_len;
uint32_t value_len;
uint32_t hash;
bool manage_mvalue;
};
struct map_bucket {
struct map_node *head;
};
struct hashmap {
struct map_bucket buckets[MAP_BUCKET_COUNT];
};
static inline void
map_init(struct hashmap *restrict map)
{
for (int i = 0; i < MAP_BUCKET_COUNT; i++) {
map->buckets[i].head = NULL;
}
}
/* See https://en.wikipedia.org/wiki/Jenkins_hash_function */
static inline uint32_t
jenkins_hash(char *key, uint32_t key_len)
{
uint32_t i = 0;
uint32_t hash = 0;
while (i != key_len) {
hash += key[i++];
hash += hash << 10;
hash ^= hash >> 6;
}
hash += hash << 3;
hash ^= hash >> 11;
hash += hash << 15;
return hash;
}
static inline void *
map_get(struct hashmap *map, char *key, uint32_t key_len, uint32_t *ret_value_len)
{
void *value = NULL;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
value = node->value;
*ret_value_len = node->value_len;
goto DONE;
}
}
if (value == NULL) *ret_value_len = 0;
DONE:
return value;
}
/**
* @manage_mvalue manage _ mvalue determines whether the hash table or the caller manages value memory, and true is managed by the hash.
*/
static inline bool
map_set(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len, bool manage_mvalue)
{
bool did_set = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) goto DONE;
}
struct map_node *new_node = (struct map_node *)xmalloc(sizeof(struct map_node));
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value_len = value_len,
.next = bucket->head };
// Copy Key and Value
memcpy(new_node->key, key, key_len);
if (manage_mvalue) {
new_node->value = xmalloc(value_len);
memcpy(new_node->value, value, value_len);
} else {
new_node->value = value;
}
new_node->manage_mvalue = manage_mvalue;
bucket->head = new_node;
did_set = true;
DONE:
return did_set;
}
/**
* @returns boolean if node was deleted or not
*/
static inline bool
map_delete(struct hashmap *map, char *key, uint32_t key_len)
{
bool did_delete = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
struct map_node *prev = NULL;
struct map_node *node = bucket->head;
while (node != NULL) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
if (prev == NULL) {
bucket->head = node->next;
} else {
prev->next = node->next;
}
free(node->key);
node->key = NULL;
if (node->manage_mvalue) {
free(node->value);
node->value = NULL;
}
free(node);
node = NULL;
did_delete = true;
break;
}
prev = node;
node = node->next;
}
return did_delete;
}
static inline void
map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len)
{
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
node->value_len = value_len;
node->value = realloc(node->value, value_len);
assert(node->value);
if (node->manage_mvalue)
{
memcpy(node->value, value, value_len);
}else node->value = value;
return;
}
}
panic("map_upsert: key not found");
}

@ -0,0 +1,246 @@
#pragma once
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
//#include "lock.h"
#include "xmalloc.h"
typedef pthread_rwlock_t rwlock_t;
#define LOCK_INIT_RW(lock) pthread_rwlock_init(lock, NULL)
#define LOCK_RDLOCK_RW(lock) pthread_rwlock_rdlock(lock)
#define LOCK_WRLOCK_RW(lock) pthread_rwlock_wrlock(lock)
#define LOCK_UNLOCK_RW(lock) pthread_rwlock_unlock(lock)
/* Simple K-V store based on The Practice of Programming by Kernighan and Pike */
/* Bucket count is sized to be a prime that is approximately 20% larger than the desired capacity (6k keys) */
#define MAP_BUCKET_COUNT 7907
#define MAP_HASH jenkins_hash
struct map_node {
struct map_node *next;
char *key;
void *value;
uint32_t key_len;
uint32_t value_len;
uint32_t hash;
bool manage_mvalue;
};
struct map_bucket {
rwlock_t lock;
struct map_node *head;
};
struct hashmap {
struct map_bucket buckets[MAP_BUCKET_COUNT];
};
static inline void
map_init(struct hashmap *restrict map)
{
for (int i = 0; i < MAP_BUCKET_COUNT; i++) {
map->buckets[i].head = NULL;
LOCK_INIT_RW(&map->buckets[i].lock);
}
}
/* See https://en.wikipedia.org/wiki/Jenkins_hash_function */
static inline uint32_t
jenkins_hash(char *key, uint32_t key_len)
{
uint32_t i = 0;
uint32_t hash = 0;
while (i != key_len) {
hash += key[i++];
hash += hash << 10;
hash ^= hash >> 6;
}
hash += hash << 3;
hash ^= hash >> 11;
hash += hash << 15;
return hash;
}
static inline void *
map_get(struct hashmap *map, char *key, uint32_t key_len, uint32_t *ret_value_len)
{
void *value = NULL;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_RDLOCK_RW(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
value = node->value;
*ret_value_len = node->value_len;
goto DONE;
}
}
if (value == NULL) *ret_value_len = 0;
DONE:
LOCK_UNLOCK_RW(&bucket->lock);
return value;
}
/**
* @manage_mvalue manage _ mvalue determines whether the hash table or the caller manages value memory, and true is managed by the hash.
*/
static inline bool
map_set(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len, bool manage_mvalue)
{
bool did_set = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_WRLOCK_RW(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) goto DONE;
}
struct map_node *new_node = (struct map_node *)xmalloc(sizeof(struct map_node));
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value_len = value_len,
.next = bucket->head };
// Copy Key and Value
memcpy(new_node->key, key, key_len);
if (manage_mvalue) {
new_node->value = xmalloc(value_len);
memcpy(new_node->value, value, value_len);
} else {
new_node->value = value;
}
new_node->manage_mvalue = manage_mvalue;
bucket->head = new_node;
did_set = true;
DONE:
LOCK_UNLOCK_RW(&bucket->lock);
return did_set;
}
/**
* @returns boolean if node was deleted or not
*/
static inline bool
map_delete(struct hashmap *map, char *key, uint32_t key_len)
{
bool did_delete = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_WRLOCK_RW(&bucket->lock);
struct map_node *prev = NULL;
struct map_node *node = bucket->head;
while (node != NULL) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
if (prev == NULL) {
bucket->head = node->next;
} else {
prev->next = node->next;
}
free(node->key);
node->key = NULL;
if (node->manage_mvalue) {
free(node->value);
node->value = NULL;
}
free(node);
node = NULL;
did_delete = true;
break;
}
prev = node;
node = node->next;
}
LOCK_UNLOCK_RW(&bucket->lock);
return did_delete;
}
static inline void
map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len)
{
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_WRLOCK_RW(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash && memcmp(node->key, key, key_len) == 0) {
node->value_len = value_len;
node->value = realloc(node->value, value_len);
assert(node->value);
if (node->manage_mvalue)
{
memcpy(node->value, value, value_len);
}else node->value = value;
goto DONE;
}
}
panic("map_upsert: key not found");
/*struct map_node *new_node = (struct map_node *)xmalloc(sizeof(struct map_node));
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value = xmalloc(value_len),
.value_len = value_len,
.next = bucket->head };
assert(new_node->key);
assert(new_node->value);
// Copy Key and Value
memcpy(new_node->key, key, key_len);
memcpy(new_node->value, value, value_len);
bucket->head = new_node;*/
DONE:
LOCK_UNLOCK_RW(&bucket->lock);
}
/*static inline void
map_destroy(struct hashmap *map) {
for (int i = 0; i < MAP_BUCKET_COUNT; i++) {
LOCK_LOCK_RW(&map->buckets[i].lock);
struct map_node *current = map->buckets[i].head;
struct map_node *next;
while (current != NULL) {
next = current->next;
free(current->key);
if (current->manage_mvalue && current->value != NULL) {
free(current->value);
}
free(current);
current = next;
}
LOCK_UNLOCK_RW(&map->buckets[i].lock);
pthread_mutex_destroy(&map->buckets[i].lock);
// 确保头指针设置为 NULL
map->buckets[i].head = NULL;
}
// 如果 hashmap 是动态分配的,这里还需要释放 hashmap 结构本身
// free(map);
}*/

@ -39,7 +39,14 @@
#warning \
"MODULE_MAX_PENDING_CLIENT_REQUESTS likely exceeds the value in /proc/sys/net/core/somaxconn and thus may be silently truncated";
#endif
#ifdef DEEP_LEARN_SCHDUE
struct MDL
{
int hash_node;
int global_queue_node;
int http_data_size;
};
#endif
struct module {
char name[MODULE_MAX_NAME_LENGTH];
char path[MODULE_MAX_PATH_LENGTH];
@ -78,7 +85,16 @@ struct module {
/* Entry Function to invoke serverless function */
mod_main_fn_t main;
struct module *next_module; /* the next module in the chain */
struct module **next_module; /* the next module in the DAG */
struct module **pre_module; /* the previous module in the DAG */
char **next_module_names; /* the next modules name in the DAG */
uint32_t next_module_count;
uint32_t pre_module_count;
bool runtime_visited; /* used for calculating the estimated time */
uint32_t run_priority;/* Used for prioritizing data fan-in to a node */
#ifdef DEEP_LEARN_SCHDUE
struct MDL mdl;/*save data for deep learn */
#endif
};
/*************************

@ -24,6 +24,9 @@ perf_window_initialize(struct perf_window *self, char* module_name)
self->count = 0;
memset(&self->by_duration, 0, sizeof(struct execution_node) * PERF_WINDOW_BUFFER_SIZE);
memset(&self->by_termination, 0, sizeof(uint16_t) * PERF_WINDOW_BUFFER_SIZE);
#ifdef GET_AVER_TIME
memset(&self->by_duration_for_mdl, 0, sizeof(uint64_t) * PERF_WINDOW_BUFFER_SIZE);
#endif
}
@ -90,12 +93,18 @@ perf_window_add(struct perf_window *self, uint64_t value)
self->by_termination[i] = i;
self->by_duration[i] = (struct execution_node){ .execution_time = value,
.by_termination_idx = i };
#ifdef GET_AVER_TIME
self->by_duration_for_mdl[i] = value;
#endif
}
self->count = PERF_WINDOW_BUFFER_SIZE;
goto done;
}
/* Otherwise, replace the oldest value, and then sort */
#ifdef GET_AVER_TIME
self->by_duration_for_mdl[self->count % PERF_WINDOW_BUFFER_SIZE] = value;
#endif
uint16_t idx_of_oldest = self->by_termination[self->count % PERF_WINDOW_BUFFER_SIZE];
bool check_up = value > self->by_duration[idx_of_oldest].execution_time;
@ -154,6 +163,33 @@ perf_window_get_percentile(struct perf_window *self, int percentile, int precomp
return self->by_duration[size * percentile / 100].execution_time;
}
#ifdef GET_AVER_TIME
static inline uint64_t
perf_window_get_average(struct perf_window *self)
{
assert(self != NULL);
int size = self->count;
if (size == 0) {
return 0;
} else if (size < PERF_WINDOW_BUFFER_SIZE) {
uint64_t average = 0;
for (size_t i = 0; i < size; i++)
{
average += self->by_duration_for_mdl[i];
}
average = average / size;
return average;
} else{
uint64_t average = 0;
for (size_t i = 0; i < PERF_WINDOW_BUFFER_SIZE; i++) {
average += self->by_duration_for_mdl[i];
}
average /= PERF_WINDOW_BUFFER_SIZE;
return average;
}
}
#endif
/**
* Returns the total count of executions
* @returns total count

@ -26,6 +26,9 @@ struct execution_node {
struct perf_window {
char name[32];
struct execution_node by_duration[PERF_WINDOW_BUFFER_SIZE];
#ifdef GET_AVER_TIME
uint64_t by_duration_for_mdl[PERF_WINDOW_BUFFER_SIZE];
#endif
uint16_t by_termination[PERF_WINDOW_BUFFER_SIZE];
uint64_t count;
lock_t lock;

@ -298,7 +298,9 @@ static inline int
priority_queue_length_nolock(struct priority_queue *self)
{
assert(self != NULL);
#ifndef DEEP_LEARN_SCHDUE
assert(!listener_thread_is_running());
#endif
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
return self->size;

@ -44,7 +44,7 @@ extern uint32_t runtime_worker_threads_count;
extern int runtime_worker_threads_argument[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern uint64_t runtime_worker_threads_deadline[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern uint64_t runtime_worker_threads_remaining_slack[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern uint64_t runtime_worker_threads_laxity[RUNTIME_WORKER_THREAD_CORE_COUNT];
extern void runtime_initialize(void);
extern void runtime_set_pthread_prio(pthread_t thread, unsigned int nice);
extern void runtime_set_resource_limits_to_max(void);

@ -115,6 +115,24 @@ sandbox_get_srsf_priority(void *element)
return remaining_slack;
};
static inline uint64_t
sandbox_get_mdl_priority(void *element)
{
struct sandbox *sandbox = (struct sandbox *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack_mdl = sandbox->remaining_slack - (now - sandbox->last_update_timestamp);
return remaining_slack_mdl;
};
static inline uint64_t
sandbox_get_llf_priority(void *element)
{
struct sandbox *sandbox = (struct sandbox *)element;
uint64_t now = __getcycles();
uint64_t Laxity_llf = sandbox->laxity - (now - sandbox->last_update_timestamp);
return Laxity_llf;
};
/**
* Maps a sandbox fd to an underlying host fd
* Returns error condition if the file_descriptor to set does not contain sandbox preopen magic
@ -242,9 +260,26 @@ sandbox_mem_print_perf(struct sandbox *sandbox)
* becomes more intelligent, then peak linear memory size needs to be tracked
* seperately from current linear memory size.
*/
mem_log("%d,%u,%s():%d,%s,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n", worker_thread_idx, sandbox->id,
mem_log("%d,%lu,%s():%d,%s,%u,%lu,%lu,%lu,%lu,%lu,%lu,%lu,%u\n", worker_thread_idx, sandbox->id,
sandbox->module->name, sandbox->module->port, sandbox_state_stringify(sandbox->state),
sandbox->module->relative_deadline_us, total_time_us, queued_us, initializing_us, runnable_us,
running_us, blocked_us, returned_us, sandbox->linear_memory_size);
}
static inline void
sandbox_MDL_print_perf(struct sandbox *sandbox)
{
#ifndef LOG_DEEP_LEARN_SCHDUE
return;
#endif
/* If the log was not defined by an environment variable, early out */
if (runtime_sandbox_perf_log == NULL) return;
#ifdef DEEP_LEARN_SCHDUE
if (sandbox->module->next_module == NULL) {
uint64_t total_time = (sandbox->completion_timestamp - sandbox->request_arrival_timestamp) / runtime_processor_speed_MHz;
fprintf(runtime_sandbox_perf_log, "%s,%lu,%d,%d,%d\n", sandbox->module->name, total_time,sandbox->module->mdl.hash_node,sandbox->module->mdl.global_queue_node,sandbox->module->mdl.http_data_size);
}
#endif
}

@ -12,10 +12,17 @@
#include "module.h"
#include "runtime.h"
#include "sandbox_state.h"
#include "lock.h"
struct sandbox_pre_functions_output {
char * previous_function_output;
ssize_t output_length;
uint32_t run_priority;
struct sandbox_pre_functions_output* next;
};
struct sandbox_request {
uint64_t id;
bool request_from_outside; /* true is yes, false is no */
bool request_from_outside; /* true is yes, false is no */
struct module * module;
char * arguments;
int socket_descriptor;
@ -24,10 +31,13 @@ struct sandbox_request {
uint64_t enqueue_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */
uint64_t last_update_timestamp; /* cycles */
uint64_t remaining_slack; /* cycles */
char * previous_function_output;
ssize_t output_length;
ssize_t previous_request_length;
uint64_t remaining_slack; /* cycles */
uint64_t laxity; /* cycles */
struct sandbox_pre_functions_output *pre_functions_output;
pthread_spinlock_t lock;
char * previous_function_output;
ssize_t output_length;
ssize_t previous_request_length;
/*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles)
@ -73,7 +83,7 @@ sandbox_request_log_allocation(struct sandbox_request *sandbox_request)
static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, bool request_from_outside, ssize_t request_length,
char *arguments, int socket_descriptor, const struct sockaddr *socket_address,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp, uint64_t remaining_slack,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp, uint64_t remaining_slack, uint64_t laxity,
uint64_t admissions_estimate, char *previous_function_output, ssize_t output_length)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
@ -95,7 +105,11 @@ sandbox_request_allocate(struct module *module, bool request_from_outside, ssize
sandbox_request->previous_request_length = request_length;
sandbox_request->last_update_timestamp = enqueue_timestamp;
sandbox_request->remaining_slack = remaining_slack;
sandbox_request->laxity = laxity;
/*Avoid pointer suspension*/
sandbox_request->pre_functions_output = NULL;
pthread_spin_init(&sandbox_request->lock, PTHREAD_PROCESS_PRIVATE);
/*
* Admissions Control State
* Assumption: an estimate of 0 should have been interpreted as a rejection
@ -107,3 +121,131 @@ sandbox_request_allocate(struct module *module, bool request_from_outside, ssize
return sandbox_request;
}
/**
* Allocate a new node for the list of previous function outputs. Ensures that
* the list remains sorted based on run_priority, which must be less than 6.
* @param head the head of struct sandbox_request->previous_function_output
* @param output the output of the previous function
* @param output_length the length of the output
* @param run_priority the run_priority of the sandbox->module->run_priority
* The first four bytes of the data represent the length of the data, and the tail character & serves as the delimiter marker
**/
static inline void
pre_functions_output_request_add(struct sandbox_request *request, char *output, ssize_t output_length, uint32_t run_priority)
{
assert(run_priority < 6);
// pthread_spin_lock(&request->lock);
if (!output || output_length <= 0) {
debuglog("output is null or output_length is <= 0");
// goto done;
return;
}
struct sandbox_pre_functions_output **head = &request->pre_functions_output;
struct sandbox_pre_functions_output *new_node = (struct sandbox_pre_functions_output *)malloc(sizeof(struct sandbox_pre_functions_output));
if (!new_node) {
panic("Could not allocate memory for new node");
}
new_node->previous_function_output = (char *)malloc(output_length);
if (!new_node->previous_function_output) {
free(new_node);
panic("Could not allocate memory for output buffer");
}
//memcpy(new_node->previous_function_output, output, output_length);
new_node->previous_function_output = output;
new_node->output_length = output_length;
new_node->run_priority = run_priority;
new_node->next = NULL;
if (*head == NULL || (*head)->run_priority >= run_priority) {
new_node->next = *head;
*head = new_node;
} else {
struct sandbox_pre_functions_output *current = *head;
while (current->next && current->next->run_priority < run_priority) {
current = current->next;
}
new_node->next = current->next;
current->next = new_node;
}
//done:
// pthread_spin_unlock(&request->lock);
}
/**
* The output result of the sandbox in the splicing structure
**/
static inline void
concatenate_outputs(struct sandbox_request *request) {
if(request->pre_functions_output == NULL) {
return;
}
size_t total_length = 0; // Calculate total length without extra for null character
struct sandbox_pre_functions_output *current = request->pre_functions_output;
if(current && !current->next)
{
char *previous_function_output = (char *)malloc(current->output_length);
if (!previous_function_output) {
panic("Could not allocate memory for concatenated output");
return;
}
memcpy(previous_function_output, current->previous_function_output, current->output_length);
request->output_length = current->output_length;
request->previous_function_output = previous_function_output;
return;
}
/* 4 bytes of data represents the length of the data */
while (current != NULL) {
total_length += current->output_length + 4;
current = current->next;
}
char *concatenated_output = (char *)malloc(total_length);
if (!concatenated_output) {
panic("Could not allocate memory for concatenated output");
return;
}
char *copy_dest = concatenated_output;
current = request->pre_functions_output;
while (current != NULL) {
size_t copy_length = current->output_length;
*(uint32_t *)copy_dest = (uint32_t)copy_length;
copy_dest += 4;
memcpy(copy_dest, current->previous_function_output, copy_length);
copy_dest += copy_length;
current = current->next;
}
if (request->previous_function_output != NULL) {
free(request->previous_function_output);
request->previous_function_output = NULL;
}
request->output_length = total_length;
request->previous_function_output = concatenated_output;
/**/
if (request->pre_functions_output != NULL)
{
struct sandbox_pre_functions_output *current = request->pre_functions_output;
struct sandbox_pre_functions_output *next = NULL;
while (current) {
next = current->next;
free(current->previous_function_output);
free(current);
current = next; }
request->pre_functions_output = NULL;
}
pthread_spin_destroy(&request->lock);
}

@ -59,6 +59,9 @@ sandbox_set_as_complete(struct sandbox *sandbox, sandbox_state_t last_state)
/* Terminal State Logging */
sandbox_print_perf(sandbox);
sandbox_mem_print_perf(sandbox);
#ifdef DEEP_LEARN_SCHDUE
sandbox_MDL_print_perf(sandbox);
#endif
sandbox_summarize_page_allocations(sandbox);
/* Do not touch sandbox state after adding to completion queue to avoid use-after-free bugs */

@ -54,6 +54,9 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox->state = SANDBOX_ERROR;
sandbox_print_perf(sandbox);
sandbox_mem_print_perf(sandbox);
#ifdef DEEP_LEARN_SCHDUE
sandbox_MDL_print_perf(sandbox);
#endif
sandbox_summarize_page_allocations(sandbox);
sandbox_free_linear_memory(sandbox);
admissions_control_subtract(sandbox->admissions_estimate);

@ -51,7 +51,8 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand
/* Copy the socket descriptor, address, and arguments of the client invocation */
sandbox->absolute_deadline = sandbox_request->absolute_deadline;
sandbox->remaining_slack = sandbox_request->remaining_slack;
sandbox->remaining_slack = sandbox_request->remaining_slack;
sandbox->laxity = sandbox_request->laxity;
sandbox->last_update_timestamp = sandbox_request->last_update_timestamp;
sandbox->arguments = (void *)sandbox_request->arguments;
sandbox->client_socket_descriptor = sandbox_request->socket_descriptor;

@ -25,11 +25,13 @@ sandbox_set_as_running(struct sandbox *sandbox, sandbox_state_t last_state)
//uint64_t last = sandbox->last_update_timestamp;
//uint64_t last_rs = sandbox->remaining_slack;
sandbox->remaining_slack -= (now - sandbox->last_update_timestamp);
sandbox->laxity -= (now - sandbox->last_update_timestamp);
sandbox->last_update_timestamp = now;
sandbox->runnable_duration += duration_of_last_state;
current_sandbox_set(sandbox);
runtime_worker_threads_deadline[worker_thread_idx] = sandbox->absolute_deadline;
runtime_worker_threads_remaining_slack[worker_thread_idx] = sandbox->remaining_slack;
runtime_worker_threads_laxity[worker_thread_idx] = sandbox->laxity;
//mem_log("time %lu sandbox starts running, request id:%d name %s obj=%p remaining slack %lu, last_rs %lu now %lu last %lu \n", start_execution,
// sandbox->id, sandbox->module->name, sandbox, sandbox->remaining_slack, last_rs, now, last);
/* Does not handle context switch because the caller knows if we need to use fast or slow switched */
@ -49,3 +51,5 @@ sandbox_set_as_running(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox->last_state_change_timestamp = now;
sandbox->state = SANDBOX_RUNNING;
}

@ -54,8 +54,9 @@ struct sandbox {
uint64_t response_timestamp; /* Timestamp when response is sent */
uint64_t completion_timestamp; /* Timestamp when sandbox runs to completion */
uint64_t last_state_change_timestamp; /* Used for bookkeeping of actual execution time */
uint64_t last_update_timestamp; /* Used for bookkeeping timestamp for SRSF */
uint64_t last_update_timestamp; /* Used for bookkeeping timestamp for SRSF && LLF */
uint64_t remaining_slack; /* Cycles */
uint64_t laxity; /* Cycles */
#ifdef LOG_SANDBOX_MEMORY_PROFILE
uint32_t page_allocation_timestamps[SANDBOX_PAGE_ALLOCATION_TIMESTAMP_COUNT];
size_t page_allocation_timestamps_size;

@ -28,7 +28,9 @@ enum SCHEDULER
{
SCHEDULER_FIFO = 0,
SCHEDULER_EDF = 1,
SCHEDULER_SRSF = 2
SCHEDULER_SRSF = 2,
SCHEDULER_MDL = 3,
SCHEDULER_LLF = 4,
};
extern enum SCHEDULER scheduler;
@ -69,6 +71,7 @@ err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
@ -108,8 +111,89 @@ err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_MDL_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox * local = local_runqueue_get_next();
uint64_t local_remaining_MDL = local == NULL ? UINT64_MAX : local->remaining_slack;
struct sandbox_request *request = NULL;
uint64_t global_remaining_slack = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_remaining_slack < local_remaining_MDL && (local_workload_count <=2 || local_runqueue_count == 0)) {
//if (global_remaining_slack < local_remaining_slack) {
if (global_request_scheduler_remove_if_earlier(&request, local_remaining_MDL) == 0) {
//uint64_t pop_time = __getcycles() - system_start_timestamp;
//mem_log("time %lu remove from GQ, request id:%d name %s remaining slack %lu\n", pop_time,
// request->id, request->module->name, request->remaining_slack);
assert(request != NULL);
struct sandbox *global = sandbox_allocate(request);
if (!global) goto err_allocate;
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
done:
return local_runqueue_get_next();
err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_LLF_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox * local = local_runqueue_get_next();
uint64_t local_Laxity = local == NULL ? UINT64_MAX : local->remaining_slack;
struct sandbox_request *request = NULL;
uint64_t global_local_Laxity = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_local_Laxity < local_Laxity && (local_workload_count <=2 || local_runqueue_count == 0)) {
if (global_request_scheduler_remove_if_earlier(&request, local_Laxity) == 0) {
//uint64_t pop_time = __getcycles() - system_start_timestamp;
//mem_log("time %lu remove from GQ, request id:%d name %s remaining slack %lu\n", pop_time,
// request->id, request->module->name, request->remaining_slack);
assert(request != NULL);
struct sandbox *global = sandbox_allocate(request);
if (!global) goto err_allocate;
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
done:
return local_runqueue_get_next();
err_allocate:
client_socket_send(request->socket_descriptor, 503);
client_socket_close(request->socket_descriptor, &request->socket_address);
free(request);
request = NULL;
goto done;
}
static inline struct sandbox *
scheduler_fifo_get_next()
{
@ -138,6 +222,7 @@ err_allocate:
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor, &sandbox->client_address);
free(sandbox_request);
sandbox_request = NULL;
err:
sandbox = NULL;
goto done;
@ -153,6 +238,10 @@ scheduler_get_next()
return scheduler_srsf_get_next();
case SCHEDULER_FIFO:
return scheduler_fifo_get_next();
case SCHEDULER_MDL:
return scheduler_MDL_get_next();
case SCHEDULER_LLF:
return scheduler_LLF_get_next();
default:
panic("Unimplemented\n");
}
@ -168,6 +257,12 @@ scheduler_initialize()
case SCHEDULER_SRSF:
global_request_scheduler_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_MDL:
global_request_scheduler_minheap_initialize(SCHEDULER_MDL);
break;
case SCHEDULER_LLF:
global_request_scheduler_minheap_initialize(SCHEDULER_LLF);
break;
case SCHEDULER_FIFO:
global_request_scheduler_deque_initialize();
break;
@ -186,6 +281,12 @@ scheduler_runqueue_initialize()
case SCHEDULER_SRSF:
local_runqueue_minheap_initialize(SCHEDULER_SRSF);
break;
case SCHEDULER_MDL:
local_runqueue_minheap_initialize(SCHEDULER_MDL);
break;
case SCHEDULER_LLF:
local_runqueue_minheap_initialize(SCHEDULER_LLF);
break;
case SCHEDULER_FIFO:
local_runqueue_list_initialize();
break;
@ -211,8 +312,8 @@ scheduler_preempt(ucontext_t *user_context)
struct sandbox *current = current_sandbox_get();
assert(current != NULL);
assert(current->state == SANDBOX_RUNNING);
if (current-> remaining_slack <= 5000 * runtime_processor_speed_MHz) {
uint64_t RR_least_time = 5000 * runtime_processor_speed_MHz;
if (current-> remaining_slack <= RR_least_time || current->laxity <= RR_least_time) {
return;
}
/* This is for better state-change bookkeeping */
@ -290,6 +391,10 @@ scheduler_print(enum SCHEDULER variant)
return "EDF";
case SCHEDULER_SRSF:
return "SRSF";
case SCHEDULER_MDL:
return "MDL";
case SCHEDULER_LLF:
return "LLF";
}
}

@ -0,0 +1,14 @@
#pragma once
#include <stdlib.h>
#include "likely.h"
#include "panic.h"
static inline void *
xmalloc(size_t size)
{
void *allocation = malloc(size);
if (unlikely(allocation == NULL)) panic("xmalloc failed!\n");
return allocation;
}

@ -42,6 +42,21 @@ admission_info_get_percentile(struct admissions_info *self)
uint64_t estimated_execution = perf_window_get_percentile(&self->perf_window, self->percentile, self->control_index);
return estimated_execution;
}
#ifdef GET_AVER_TIME
/*
* Get the average execution time of this module, no lock for accessing the queue
* @param self
* @returns the specified execution time of this module
*/
uint64_t
admission_info_get_average(struct admissions_info *self)
{
uint64_t estimated_execution = perf_window_get_average(&self->perf_window);
return estimated_execution;
}
#endif
/*
* Adds an execution value to the perf window and calculates and caches and updated estimate
* @param self

@ -8,9 +8,19 @@
#include "scheduler.h"
#include "module.h"
#include "software_interrupt.h"
#include "map.h"
extern uint64_t system_start_timestamp;
lock_t lock;
#ifdef DEEP_LEARN_SCHDUE
_Atomic uint32_t hash_count = 0;
#endif
#define OUTPUT_BUFER_SIZE 1024*5
__thread struct sandbox *worker_thread_current_sandbox = NULL;
__thread struct sandbox_context_cache local_sandbox_context_cache = {
@ -55,6 +65,13 @@ current_sandbox_disable_preemption(struct sandbox *sandbox)
}
}
static inline void
current_sandbox_get_newlaxity(struct sandbox *sandbox, uint64_t now)
{
assert(sandbox);
sandbox->remaining_slack -= (now - sandbox->last_update_timestamp);
}
/**
* Sandbox execution logic
* Handles setup, request parsing, WebAssembly initialization, function execution, response building and
@ -70,7 +87,30 @@ current_sandbox_start(void)
char *error_message = "";
sandbox_initialize_stdio(sandbox);
struct module * next_module = sandbox->module->next_module;
int next_module_idx = sandbox->module->next_module_count;
static struct hashmap *sandbox_req_map = NULL;
static struct hashmap *sandbox_request_id = NULL;
if (sandbox_req_map == NULL || sandbox_request_id == NULL) {
if(sandbox_req_map == NULL)
{
sandbox_req_map = malloc(sizeof(struct hashmap));
#ifdef DEEP_LEARN_SCHDUE
atomic_init(&hash_count, 0);
#endif
map_init(sandbox_req_map);
}
if(sandbox_request_id == NULL)
{
sandbox_request_id = malloc(sizeof(struct hashmap));
map_init(sandbox_request_id);
}
assert(sandbox_req_map != NULL);
assert(sandbox_request_id != NULL);
}
struct module **next_module = sandbox->module->next_module;
/*
* Add the client fd to epoll if it is the first or last sandbox in the chain because they
@ -118,54 +158,216 @@ current_sandbox_start(void)
*/
goto err;
} else if (next_module != NULL) {
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
assert(next_module_idx);
assert(next_module);
size_t next_module_pre_count = next_module[0]->pre_module_count;
assert(next_module_pre_count);
if (next_module_idx == 1 && next_module_pre_count == 1)
{
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t enqueue_timestamp = __getcycles();
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module, false, sandbox->request_length,
next_module->name, sandbox->client_socket_descriptor,
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
struct module *next_module_node = next_module[0];
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, enqueue_timestamp,
sandbox->remaining_slack, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
#ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) {
free(sandbox_request);
goto err;
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack, sandbox->laxity, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
global_request_scheduler_add(sandbox_request);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
} else if (next_module_idx > 1 && next_module_pre_count == 1)
{
assert(next_module_idx > 1);
for (size_t i = 0; i < next_module_idx; i++)
{
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct module * next_module_node = next_module[i];
assert(next_module_node);
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
#ifdef LOG_DEEP_LEARN_SCHDUE
#ifdef DEEP_LEARN_SCHDUE
/*
* If model parameters need to be trained, then enable the feature;
* otherwise, disable it to avoid unnecessary overhead.
*/
next_module_node->mdl.hash_node = sandbox->module->mdl.hash_node;
next_module_node->mdl.global_queue_node = sandbox->module->mdl.global_queue_node;
next_module_node->mdl.http_data_size = sandbox->module->mdl.http_data_size;
#endif
#endif
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack, sandbox->laxity, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
#ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) {
free(sandbox_request);
goto err;
}
assert(next_sandbox->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(next_sandbox, SANDBOX_INITIALIZED);
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
#else
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
#endif
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
goto done;
} else if (next_module_idx == 1 && next_module_pre_count > 1)
{
static bool lock_flag = true;
if (lock_flag)
{
LOCK_INIT(&lock);
lock_flag = false;
}
/*Before each id is put into the hash table, the key needs to add a "module handle"*/
struct module * next_module_node = next_module[0];
assert(next_module_node);
char *cur_request_id = NULL;
int key_len = snprintf(NULL, 0, "%s%lu", next_module_node->name, sandbox->id) + 1;
cur_request_id = (char *)malloc(key_len);
assert(cur_request_id);
snprintf(cur_request_id, key_len, "%s%lu", next_module_node->name, sandbox->id);
uint32_t ret_value_len;
uint32_t rest_pre_count = 888;
/*calculation the pre_function_out*/
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//debuglog("the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
LOCK_LOCK(&lock);
uint64_t *request_id = (uint64_t *)map_get(sandbox_request_id, cur_request_id, strlen(cur_request_id), &ret_value_len);
bool mapflag = false;
if (!request_id) {
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
#ifdef LOG_DEEP_LEARN_SCHDUE
#ifdef DEEP_LEARN_SCHDUE
/*
* If model parameters need to be trained, then enable the feature;
* otherwise, disable it to avoid unnecessary overhead.
*/
next_module_node->mdl.hash_node = sandbox->module->mdl.hash_node;
next_module_node->mdl.global_queue_node = sandbox->module->mdl.global_queue_node;
next_module_node->mdl.http_data_size = sandbox->module->mdl.http_data_size;
#endif
#endif
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack,sandbox->laxity, true, NULL, 0);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
uint32_t module_pre_count = next_module_pre_count;
module_pre_count--;
assert(module_pre_count);
if(!map_set(sandbox_request_id, cur_request_id, strlen(cur_request_id), &module_pre_count, sizeof(uint32_t), true)) panic("the map of sandbox_request_id is NULL\n");
if(!map_set(sandbox_req_map, cur_request_id, strlen(cur_request_id), sandbox_request, sizeof(struct sandbox_request *), false)) panic("the map of sandbox_request is NULL\n");
mapflag = true;
#ifdef DEEP_LEARN_SCHDUE
atomic_fetch_add(&hash_count, 1);
#endif
}
LOCK_UNLOCK(&lock);
struct sandbox_request *sandbox_request = map_get(sandbox_req_map, cur_request_id, strlen(cur_request_id), &ret_value_len);
if(!sandbox_request) panic("the map of sandbox_request is NULL\n");
if (mapflag)
{
pre_functions_output_request_add(sandbox_request, pre_func_output, output_length, sandbox->module->run_priority);
}else{
pthread_spin_lock(&sandbox_request->lock);
pre_functions_output_request_add(sandbox_request, pre_func_output, output_length, sandbox->module->run_priority);
if (!request_id) {
panic("Request ID not found or invalid\n");
}else {
rest_pre_count = *request_id;
}
if(rest_pre_count == 888) panic("the rest_pre_count is not get requst_id\n");
rest_pre_count--;
if (rest_pre_count != 0)
{
map_upsert(sandbox_request_id, cur_request_id, strlen(cur_request_id), &rest_pre_count, sizeof(uint32_t));
}else{
concatenate_outputs(sandbox_request);
uint64_t enqueue_timestamp = __getcycles();
sandbox_request->enqueue_timestamp = enqueue_timestamp;
global_request_scheduler_add(sandbox_request);
map_delete(sandbox_req_map, cur_request_id, strlen(cur_request_id));
map_delete(sandbox_request_id, cur_request_id, strlen(cur_request_id));
#ifdef DEEP_LEARN_SCHDUE
atomic_fetch_sub(&hash_count, 1);
#endif
}
pthread_spin_unlock(&sandbox_request->lock);
}
free(cur_request_id);
cur_request_id = NULL;
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
goto done;
}else
{
error_message = "the strcuture of DAG is not supported\n";
goto err;
}
} else {
/* Retrieve the result, construct the HTTP response, and send to client */
if (sandbox_send_response(sandbox) < 0) {
@ -173,7 +375,7 @@ current_sandbox_start(void)
goto err;
};
http_total_increment_2xx();
//http_total_increment_2xx();
sandbox->response_timestamp = __getcycles();

@ -91,3 +91,8 @@ global_request_scheduler_peek()
{
return global_request_scheduler.peek_fn();
}
int
global_request_scheduler_size(){
return global_request_scheduler.size_fn();
}

@ -64,6 +64,12 @@ global_request_scheduler_minheap_peek(void)
return priority_queue_peek(global_request_scheduler_minheap);
}
static int
global_request_scheduler_minheap_size(void)
{
return priority_queue_length(global_request_scheduler_minheap);
}
uint64_t
sandbox_request_get_priority_fn(void *element)
{
@ -80,6 +86,25 @@ sandbox_request_get_priority_srsf_fn(void *element)
return remaining_slack;
};
uint64_t
sandbox_request_get_priority_mdl_fn(void *element)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)element;
uint64_t now = __getcycles();
uint64_t remaining_slack_mdl = sandbox_request->remaining_slack - (now - sandbox_request->last_update_timestamp);
return remaining_slack_mdl;
};
uint64_t
sandbox_request_get_priority_llf_fn(void *element)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)element;
uint64_t now = __getcycles();
uint64_t Laxity_llf = sandbox_request->laxity - (now - sandbox_request->last_update_timestamp);
return Laxity_llf;
};
/**
* Initializes the variant and registers against the polymorphic interface
*/
@ -90,13 +115,18 @@ global_request_scheduler_minheap_initialize(enum SCHEDULER scheduler)
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_fn);
} else if (scheduler == SCHEDULER_SRSF) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_srsf_fn);
} else if (scheduler == SCHEDULER_MDL) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_mdl_fn);
} else if (scheduler == SCHEDULER_LLF) {
global_request_scheduler_minheap = priority_queue_initialize(40960, true, sandbox_request_get_priority_llf_fn);
}
struct global_request_scheduler_config config = {
.add_fn = global_request_scheduler_minheap_add,
.remove_fn = global_request_scheduler_minheap_remove,
.remove_if_earlier_fn = global_request_scheduler_minheap_remove_if_earlier,
.peek_fn = global_request_scheduler_minheap_peek
.peek_fn = global_request_scheduler_minheap_peek,
.size_fn = global_request_scheduler_minheap_size
};
global_request_scheduler_initialize(&config);

@ -7,8 +7,15 @@
#include "generic_thread.h"
#include "listener_thread.h"
#include "runtime.h"
#include "scheduler.h"
#ifdef DEEP_LEARN_SCHDUE
extern _Atomic uint32_t hash_count;
#endif
extern uint64_t system_start_timestamp;
extern enum SCHEDULER scheduler;
/*When reading the json file, the size has been determined at module.c JSON_MAX_ELEMENT_COUNT*/
const int QUEUE_SIZE = 36;
/*
* Descriptor of the epoll instance used to monitor the socket descriptors of registered
* serverless modules. The listener cores listens for incoming client requests through this.
@ -100,7 +107,6 @@ listener_thread_main(void *dummy)
*/
assert(descriptor_count > 0);
uint64_t request_arrival_timestamp = __getcycles();
for (int i = 0; i < descriptor_count; i++) {
/* Check Event to determine if epoll returned an error */
if ((epoll_events[i].events & EPOLLERR) == EPOLLERR) {
@ -175,22 +181,76 @@ listener_thread_main(void *dummy)
continue;
}
#ifdef DEEP_LEARN_SCHDUE
/*Get HTTP buffer size*/
int content_length = 0;
module->mdl.global_queue_node = global_request_scheduler_size();
module->mdl.hash_node = hash_count;
char buffer[1024];
ssize_t nbytes_peeked = recv(client_socket, buffer, sizeof(buffer), MSG_PEEK);
if (nbytes_peeked <= 0){
content_length = 5090;
} else{
buffer[sizeof(buffer)-1] = '\0';
const char *content_length_str = "Content-Length: ";
char *start = strstr(buffer, content_length_str);
if (start) {
start += strlen(content_length_str);
while (*start == ' ') start++;
content_length = atoi(start);
//debuglog("Content-Length: %d\n", content_length);
}else{
content_length = 5090;
}
}
module->mdl.http_data_size = content_length;
#endif
/* get total estimated execution time */
uint64_t estimated_execution_time = admission_info_get_percentile(&module->admissions_info);
struct module * next_module = module;
while(next_module) {
estimated_execution_time += admission_info_get_percentile(&next_module->admissions_info);
next_module = next_module->next_module;
}
uint64_t estimated_execution_time = 0;
int front = 0, rear = 0;
struct module *queue[QUEUE_SIZE] = {NULL};
queue[rear++] = module;
while (rear != front)
{
struct module *current_module = queue[front++];
if (scheduler == SCHEDULER_SRSF || scheduler == SCHEDULER_EDF || scheduler == SCHEDULER_LLF)
{
estimated_execution_time += admission_info_get_percentile(&current_module->admissions_info);
}else if (scheduler == SCHEDULER_MDL ){
#ifdef GET_AVER_TIME
estimated_execution_time += admission_info_get_average(&current_module->admissions_info);
#else
estimated_execution_time += admission_info_get_percentile(&current_module->admissions_info);
#endif
}
for (int i = 0; i < current_module->next_module_count; i++) {
if (current_module->next_module[i] != NULL && !current_module->next_module[i]->runtime_visited)
{
queue[rear++] = current_module->next_module[i];
current_module->next_module[i]->runtime_visited = true;
}
}
assert(rear <= QUEUE_SIZE);
assert(front <= QUEUE_SIZE);
}
/*Recover the flags of the module here, so that it can be accessed next time.*/
for (int i = 0; i < QUEUE_SIZE; i++) {
if (queue[i] != NULL) {
struct module *current_module = queue[i];
current_module->runtime_visited = false;
}
}
/* Adding system start timestamp to avoid negative remaining slack in the following update. They are all cycles */
uint64_t remaining_slack = system_start_timestamp + module->relative_deadline - estimated_execution_time;
uint64_t remaining_slack = system_start_timestamp + module->relative_deadline - estimated_execution_time;
uint64_t request_arrival_timestamp = __getcycles();
/* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, true, 0, module->name, client_socket,
sandbox_request_allocate(module, true, 0, module->name, client_socket,
(const struct sockaddr *)&client_address,
request_arrival_timestamp, request_arrival_timestamp,remaining_slack,
request_arrival_timestamp, request_arrival_timestamp,remaining_slack, remaining_slack,
work_admitted, NULL, 0);
/* Add to the Global Sandbox Request Scheduler */

@ -82,6 +82,13 @@ local_runqueue_minheap_initialize(enum SCHEDULER scheduler)
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_priority);
} else if (scheduler == SCHEDULER_SRSF) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_srsf_priority);
} else if (scheduler == SCHEDULER_MDL) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_mdl_priority);
} else if (scheduler == SCHEDULER_LLF) {
local_runqueue_minheap = priority_queue_initialize(10240, false, sandbox_get_llf_priority);
} else{
panic("Invalid scheduler type %d\n", scheduler);
}
/* Register Function Pointers for Abstract Scheduling API */

@ -186,6 +186,11 @@ runtime_configure()
scheduler = SCHEDULER_FIFO;
} else if (strcmp(scheduler_policy, "SRSF") == 0) {
scheduler = SCHEDULER_SRSF;
} else if (strcmp(scheduler_policy, "MDL") == 0) {
scheduler = SCHEDULER_MDL;
} else if (strcmp(scheduler_policy, "LLF") == 0)
{
scheduler = SCHEDULER_LLF;
} else {
panic("Invalid scheduler policy: %s. Must be {EDF|FIFO}\n", scheduler_policy);
}
@ -197,7 +202,7 @@ runtime_configure()
if (strcmp(sigalrm_policy, "BROADCAST") == 0) {
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
} else if (strcmp(sigalrm_policy, "TRIAGED") == 0) {
if (unlikely(scheduler != SCHEDULER_EDF && scheduler != SCHEDULER_SRSF)) panic("triaged sigalrm handlers are only valid with EDF and SRSF\n");
if (unlikely(scheduler != SCHEDULER_EDF && scheduler != SCHEDULER_SRSF && scheduler != SCHEDULER_MDL && scheduler != SCHEDULER_LLF)) panic("triaged sigalrm handlers are only valid with EDF and SRSF\n");
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_TRIAGED;
} else {
panic("Invalid sigalrm policy: %s. Must be {BROADCAST|TRIAGED}\n", sigalrm_policy);
@ -226,8 +231,13 @@ runtime_configure()
printf("\tSandbox Performance Log: %s\n", runtime_sandbox_perf_log_path);
runtime_sandbox_perf_log = fopen(runtime_sandbox_perf_log_path, "w");
if (runtime_sandbox_perf_log == NULL) { perror("sandbox perf log"); }
fprintf(runtime_sandbox_perf_log, "threadid,id,function,state,deadline,actual,queued,initializing,runnable,"
#ifdef LOG_DEEP_LEARN_SCHDUE
fprintf(runtime_sandbox_perf_log, "module.name, total_time,module.hash_node,modul.global_queue_node,module.http_data_size\n");
#else
fprintf(runtime_sandbox_perf_log, "threadid,id,function,state,deadline,actual,queued,initializing,runnable,"
"running,blocked,returned,memory\n");
#endif
} else {
printf("\tSandbox Performance Log: Disabled\n");
}

@ -93,6 +93,7 @@ mem_log(char const * fmt, ...)
} else {
/* Write Success */
log_obj.offset += n;
// dump_log_to_file(log_obj);
}
}

@ -19,6 +19,7 @@
const int JSON_MAX_ELEMENT_COUNT = 16;
const int JSON_MAX_ELEMENT_SIZE = 1024;
const int MODULE_MAX_COUNT = 36;
/*************************
* Private Static Inline *
@ -374,7 +375,10 @@ module_new_from_json(char *file_name)
int module_count = 0;
char *request_headers = NULL;
char *reponse_headers = NULL;
struct module *tail_module = NULL;
struct module **nodes = malloc( MODULE_MAX_COUNT * sizeof(struct module*));
if (nodes == NULL) {
panic("Memory allocation failed for nodes array\n");
}
for (int i = 0; i < total_tokens; i++) {
assert(tokens[i].type == JSMN_OBJECT);
@ -397,21 +401,25 @@ module_new_from_json(char *file_name)
}
memset(reponse_headers, 0, HTTP_MAX_HEADER_LENGTH * HTTP_MAX_HEADER_COUNT);
uint32_t next_module_count = 0;
uint32_t pre_module_count = 0;
int32_t request_size = 0;
int32_t response_size = 0;
int32_t argument_count = 0;
uint32_t priority = 1;
uint32_t port = 0;
uint32_t relative_deadline_us = 0;
uint32_t expected_execution_us = 0;
int admissions_percentile = 50;
bool is_active = false;
bool is_tail_module = false;
// bool is_tail_module = false;
int32_t request_count = 0;
int32_t response_count = 0;
int j = 1;
int ntoks = 2 * tokens[i].size;
char request_content_type[HTTP_MAX_HEADER_VALUE_LENGTH] = { 0 };
char response_content_type[HTTP_MAX_HEADER_VALUE_LENGTH] = { 0 };
char **next_module_names = NULL;
for (; j < ntoks;) {
int ntks = 1;
@ -440,11 +448,38 @@ module_new_from_json(char *file_name)
if (buffer < 0 || buffer > 65535)
panic("Expected port between 0 and 65535, saw %d\n", buffer);
port = buffer;
}else if (strcmp(key, "priority") == 0)
{
priority = atoi(val);
if (priority < 0 || priority > 5)
panic("Expected priority between 0 and 5, saw %d\n", priority);
} else if (strcmp(key, "argsize") == 0) {
// Validate in expected range 0..127. Unclear if 127 is an actual hard limit
argument_count = atoi(val);
if (argument_count < 0 || argument_count > 127)
panic("Expected argument count between 0 and 127, saw %d\n", argument_count);
} else if (strcmp(key, "pre_module_count") == 0)
{
pre_module_count = atoi(val);
if (pre_module_count < 0)
panic("Expected pre_module_count to be nonnegative, saw %d\n", pre_module_count);
} else if (strcmp(key, "next_modules") == 0)
{
assert(tokens[i + j + 1].type == JSMN_ARRAY);
request_count = tokens[i + j + 1].size;
ntks += request_count;
ntoks += request_count;
next_module_names = malloc(request_count * sizeof(char *));
next_module_count = request_count;
int array_index = 0;
for (int k = 1; k <= request_count; k++) {
jsmntok_t *g = &tokens[i + j + k + 1];
int name_length = g->end - g->start;
next_module_names[array_index] = malloc(name_length + 1);
strncpy(next_module_names[array_index], file_buffer + g->start, name_length);
next_module_names[array_index][name_length] = '\0';
array_index++;
}
} else if (strcmp(key, "active") == 0) {
assert(tokens[i + j + 1].type == JSMN_PRIMITIVE);
if (val[0] == 't') {
@ -454,15 +489,15 @@ module_new_from_json(char *file_name)
} else {
panic("Expected active key to be a JSON boolean, was %s\n", val);
}
} else if (strcmp(key, "tail-module") == 0) {
assert(tokens[i + j + 1].type == JSMN_PRIMITIVE);
if (val[0] == 't') {
is_tail_module = true;
} else if (val[0] == 'f') {
is_tail_module = false;
} else {
panic("Expected tail_module key to be a JSON boolean, was %s\n", val);
}
// } else if (strcmp(key, "tail-module") == 0) {
// assert(tokens[i + j + 1].type == JSMN_PRIMITIVE);
// if (val[0] == 't') {
// is_tail_module = true;
// } else if (val[0] == 'f') {
// is_tail_module = false;
// } else {
// panic("Expected tail_module key to be a JSON boolean, was %s\n", val);
// }
} else if (strcmp(key, "relative-deadline-us") == 0) {
int64_t buffer = strtoll(val, NULL, 10);
if (buffer < 0 || buffer > (int64_t)RUNTIME_RELATIVE_DEADLINE_US_MAX)
@ -566,28 +601,92 @@ module_new_from_json(char *file_name)
relative_deadline_us, port, request_size, response_size,
admissions_percentile, expected_execution_us);
if (module == NULL) goto module_new_err;
if(next_module_count == 0)
{
module->next_module_names = NULL;
}else
{
module->next_module_names = malloc(next_module_count * sizeof(char*));
if (module->next_module_names == NULL) panic("Failed to allocate memory for next_module_names");
for (int i = 0; i < next_module_count; i++) {
module->next_module_names[i] = strdup(next_module_names[i]);
if (module->next_module_names[i] == NULL) {
panic("Memory allocation failed for next_module_names[%d].\n", i);
}
}
}
module->next_module_count = next_module_count;
module->pre_module_count = pre_module_count;
module->next_module = NULL;
module->pre_module = NULL;
module->runtime_visited = false;
module->run_priority = priority;
#ifdef DEEP_LEARN_SCHDUE
module->mdl.hash_node = 0;
module->mdl.global_queue_node = 0;
module->mdl.http_data_size = 0;
#endif
assert(module);
if (tail_module != NULL) { tail_module->next_module = module; }
tail_module = module;
tail_module->next_module = NULL;
/* if this is the tail module, reset tail_module to NULL to build another new chain */
if (is_tail_module) {
tail_module = NULL;
}
module_set_http_info(module, request_count, request_headers, request_content_type,
response_count, reponse_headers, response_content_type);
nodes[module_count] = module;
module_count++;
}
free(request_headers);
free(reponse_headers);
for (int i = 0; i < next_module_count; i++) {
free(next_module_names[i]);
}
free(next_module_names);
}
if (module_count == 0) panic("%s contained no active modules\n", file_name);
for (int i = 0; i < module_count; i++) {
assert(nodes[i]);
uint32_t next_module_count = nodes[i]->next_module_count;
if (next_module_count == 0) continue;
nodes[i]->next_module = (struct module**) malloc( next_module_count * sizeof(struct module*));
if (nodes[i]->next_module == NULL) panic("Failed to allocate memory for next_module");
for (int next_module_mem = 0; next_module_mem < next_module_count; next_module_mem++) nodes[i]->next_module[next_module_mem] = NULL;
for (int j = 0; j < next_module_count; j++) {
for (int m = i + 1; m < module_count; m++) {
if (strncmp(nodes[i]->next_module_names[j], nodes[m]->name, MODULE_MAX_NAME_LENGTH) == 0) {
assert(nodes[m]);
uint32_t precount = nodes[m]->pre_module_count;
assert(precount);
if (nodes[m]->pre_module == NULL) {
nodes[m]->pre_module = (struct module**) malloc(precount * sizeof(struct module*));
if (nodes[m]->pre_module == NULL) panic("Failed to allocate memory for pre_module");
for (int k = 0; k < precount; k++) {
nodes[m]->pre_module[k] = NULL;
}
}
nodes[i]->next_module[j] = nodes[m];
int preflag = 0;
for (; preflag < precount; preflag++) {
if(nodes[m]->pre_module[preflag] == NULL) break;
}
nodes[m]->pre_module[preflag] = nodes[i];
break;
}
}
}
}
/*Avoid module memory copy overhead*/
for(int i = 0; i < module_count; i++) {
for (int j = 0; j < nodes[i]->next_module_count; j++)
{
free(nodes[i]->next_module_names[j]);
}
nodes[i]->next_module_names = NULL;
}
free(nodes);
nodes = NULL;
#ifdef LOG_MODULE_LOADING
debuglog("Loaded %d module%s!\n", module_count, module_count > 1 ? "s" : "");
#endif

@ -33,6 +33,7 @@ int runtime_worker_threads_argument[RUNTIME_WORKER_THREAD_CORE_COUNT] = {
/* The active deadline of the sandbox running on each worker thread */
uint64_t runtime_worker_threads_deadline[RUNTIME_WORKER_THREAD_CORE_COUNT] = { UINT64_MAX };
uint64_t runtime_worker_threads_remaining_slack[RUNTIME_WORKER_THREAD_CORE_COUNT] = { UINT64_MAX };
uint64_t runtime_worker_threads_laxity[RUNTIME_WORKER_THREAD_CORE_COUNT] = { UINT64_MAX };
/******************************************
* Shared Process / Listener Thread Logic *
@ -41,7 +42,7 @@ uint64_t runtime_worker_threads_remaining_slack[RUNTIME_WORKER_THREAD_CORE_COUNT
void
runtime_cleanup()
{
if (runtime_sandbox_perf_log != NULL) fflush(runtime_sandbox_perf_log);
//if (runtime_sandbox_perf_log != NULL) fflush(runtime_sandbox_perf_log);
software_interrupt_deferred_sigalrm_max_print();
exit(EXIT_SUCCESS);

@ -151,7 +151,6 @@ sandbox_allocate(struct sandbox_request *sandbox_request)
/* Set state to initializing */
sandbox_set_as_initialized(sandbox, sandbox_request, now);
free(sandbox_request);
done:
return sandbox;
@ -186,6 +185,7 @@ sandbox_free(struct sandbox *sandbox)
assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE);
int rc;
//printf("ID %lu of sandbox %s removing sandbox from queue\n",sandbox->id, sandbox->module->name);
if (sandbox->previous_function_output != NULL) {
free(sandbox->previous_function_output);
sandbox->previous_function_output = NULL;

@ -94,6 +94,16 @@ sigalrm_propagate_workers(siginfo_t *signal_info)
uint64_t global_slack = global_request_scheduler_peek();
if (global_slack < local_remaining_slack) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
} else if (scheduler == SCHEDULER_MDL) {
uint64_t local_remaining_slack = runtime_worker_threads_remaining_slack[i];
uint64_t global_slack = global_request_scheduler_peek();
if (global_slack < local_remaining_slack) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
} else if (scheduler == SCHEDULER_LLF) {
uint64_t local_Laxity = runtime_worker_threads_laxity[i];
uint64_t global_Laxity = global_request_scheduler_peek();
if (global_Laxity < local_Laxity) pthread_kill(runtime_worker_threads[i], SIGALRM);
continue;
}
}
case RUNTIME_SIGALRM_HANDLER_BROADCAST: {
@ -253,12 +263,12 @@ done:
void
software_interrupt_arm_timer(void)
{
if (!runtime_preemption_enabled) return;
/* if preemption disabled, broadcast sig alarm to all other threads to record the queuelength info */
/* if preemption disabled, broadcast sig alarm to all other threads to record the queuelength info */
if (!runtime_preemption_enabled) {
runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
return;
}
struct itimerval interval_timer;
memset(&interval_timer, 0, sizeof(struct itimerval));

@ -2,16 +2,17 @@ include Makefile.inc
#TESTS=fibonacci fibonacci2 fibonacci3 big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS=fibonacci big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS2=fibonacciadd mem work3 picinpic noop fibonacci2 fibchain
TESTSRT=$(TESTS:%=%_rt)
.PHONY: all clean rttests tinyekf cifar10 gocr sod
TESTSRT2=$(TESTS2:%=%_rt)
.PHONY: all clean rttests tinyekf cifar10 gocr sod add
all: rttests tinyekf cifar10 gocr sod
@echo "Test Compilation done!"
rttests: $(TESTSRT)
add: $(TESTSRT2)
clean:
@echo "Cleaning Test Applications"
@ -42,6 +43,7 @@ sod:
@make dir samples.so -C ./sod/
@cp ./sod/bin/license_plate_detection.so ${SLEDGE_BIN_DIR}/lpd_wasm.so
@cp ./sod/bin/resize_image.so ${SLEDGE_BIN_DIR}/resize_wasm.so
# @cp ./sod/bin/reverse.so ${SLEDGE_BIN_DIR}/reverse_wasm.so
C-Image-Manip:
@echo "Making and Installing pngPlay"

File diff suppressed because one or more lines are too long

@ -0,0 +1 @@
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

@ -0,0 +1,15 @@
#!/bin/bash
function usage {
echo "$0 [cpu-log]"
exit 1
}
path="/home/hai/sledge-old/runtime/tests"
#test work1k 2000
server_log_file="edf_1k.log"
$path/compare/start_compare.sh $server_log_file >/dev/null 2>&1 &
echo "sledge is running"
./hey_test_compare.sh 60 22
$path/kill_sledge.sh

File diff suppressed because one or more lines are too long

@ -0,0 +1,24 @@
function usage {
echo "$0 [duration(s)] [concurrency]"
exit 1
}
if [ $# != 2 ] ; then
usage
exit 1;
fi
duration=$1
concurrency=$2
f1="1byte_"$concurrency".txt"
echo $f1
f2="0.4k_"$concurrency".txt"
./test_8c.sh $f1 $duration $concurrency 1byte_file 10000 2>&1 &
pid1=$!
./test_8c.sh $f2 $duration $concurrency 410byte_file 10004 2>&1 &
pid4=$!
wait -f $pid1
wait -f $pid2
printf "[OK]\n"

@ -0,0 +1,29 @@
#!/bin/bash
function usage {
echo "$0 [perf output file, chain_function_perf.log or single_function_perf.log or opt_function_perf.log]"
exit 1
}
if [ $# != 1 ] ; then
usage
exit 1;
fi
output=$1
declare project_path="$(
cd "$(dirname "$0")/../../.."
pwd
)"
echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2400
export SLEDGE_SCHEDULER=EDF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/compare/test_work1k.json

@ -0,0 +1,22 @@
function usage {
echo "$0 [output file] [duration(s)] [concurrency] [image file] [port]"
exit 1
}
if [ $# != 5 ] ; then
echo "input parameters are not 5"
usage
exit 1;
fi
output=$1
duration=$2
concurrency=$3
image=$4
port=$5
echo "hey -disable-compression -disable-keepalive -disable-redirects -c $concurrency -z $duration\s -t 0 -m GET -D "$image" "http://127.0.0.1:$port" > $output"
#hey -disable-compression -disable-keepalive -disable-redirects -c 1 -q $rps -z $duration\s -cpus 1 -t 0 -m GET -D "$image" "http://10.10.1.1:$port"
#hey -disable-compression -disable-keepalive -disable-redirects -c $concurrency -z 20s -t 0 -m GET -D "$image" "http://10.10.1.1:$port"
hey -disable-compression -disable-keepalive -disable-redirects -n 5000 -c $concurrency -z $duration\s -t 0 -m GET -D "$image" "http://127.0.0.1:$port" > $output

@ -0,0 +1,136 @@
{
"active": true,
"name": "work1k1_1",
"path": "work1k_wasm.so",
"port": 10000,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work1k1_2", "work1k1_3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k1_2",
"path": "work1k_wasm.so",
"port": 10001,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work1k1_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k1_3",
"path": "work1k_wasm.so",
"port": 10002,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 2,
"pre_module_count": 1,
"next_modules": ["work1k1_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k1_4",
"path": "work1k_wasm.so",
"port": 10003,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain",
},
{
"active": true,
"name": "work1k2_1",
"path": "work1k_wasm.so",
"port": 10004,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work1k2_2", "work1k2_3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k2_2",
"path": "work1k_wasm.so",
"port": 10005,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work1k2_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k2_3",
"path": "work1k_wasm.so",
"port": 10006,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 2,
"pre_module_count": 1,
"next_modules": ["work1k2_4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work1k2_4",
"path": "work1k_wasm.so",
"port": 10007,
"relative-deadline-us": 90000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1200,
"http-resp-headers": [],
"http-resp-size": 1200,
"http-resp-content-type": "text/plaini",
},

@ -0,0 +1,85 @@
{
"active": true,
"name": "fibona1",
"path": "fibchain_wasm.so",
"port": 10000,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["fibona2"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona2",
"path": "fibchain_wasm.so",
"port": 10002,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona3",
"path": "fibchain_wasm.so",
"port": 10003,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona4",
"path": "fibchain_wasm.so",
"port": 10004,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona5"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona5",
"path": "fibchain_wasm.so",
"port": 10005,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}

@ -0,0 +1,20 @@
#include <stdio.h>
#include <stdlib.h>
unsigned long int
fib(unsigned long int n)
{
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
int
main(int argc, char **argv)
{
unsigned long n = 0;
scanf("%lu", &n);
n= fib(n);
printf("%lu\n", n);
return 0;
}

@ -1,40 +1,30 @@
#include <stdio.h>
#include <stdlib.h>
// #include "get_time.h"
unsigned long int
fib(unsigned long int n)
{
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
/*
int
main(int argc, char **argv)
{
unsigned long r = 0;
//scanf("%s", recv_buf);
r = fib(30);
printf("%lu\n", r);
return 0;
}*/
int
main(int argc, char **argv)
{
//char * recv_buf = malloc(1024 * 1024);
char recv_buf[1024 * 1024] = {0};
//memset(recv_buf, 0, 1024 * 1024);
unsigned long r = 0;
//scanf("%s", recv_buf);
r = read(0, recv_buf, 1024 * 1024);
unsigned long n = 0;
scanf("%lu", &n);
//size_t rd = read(0, recv_buf, 1000*1024);
//if (rd <= 0) return -1;
// unsigned long long st = get_time(), en;
//r = fib(30);
// en = get_time();
printf("%lu\n", r);
for (int i = 0; i < 3; i++)
{
n = fib(n);
}
printf("%lu\n", n);
// print_time(st, en);
return 0;
}

@ -0,0 +1,99 @@
#include "dag_data_split.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct DataNode {
uint32_t dataLength;
char *data;
struct DataNode *next;
};
DataNode* splitData(char *buffer, uint32_t bufferSize) {
DataNode *head = NULL;
DataNode *tail = NULL;
uint32_t offset = 0;
while (offset < bufferSize) {
if (offset + 4 > bufferSize) {
break;
}
uint32_t dataLength = *(uint32_t *)(buffer + offset);
offset += 4;
if (offset + dataLength > bufferSize) {
break;
}
DataNode *newNode = (DataNode *)malloc(sizeof(DataNode));
if (newNode == NULL) {
perror("Memory allocation failed");
freeDataNodes(head); // 释放已分配的节点内存
return NULL;
}
newNode->data = (char *)malloc(dataLength);
if (newNode->data == NULL) {
free(newNode);
perror("Memory allocation failed");
freeDataNodes(head); // 释放已分配的节点内存
return NULL;
}
memcpy(newNode->data, buffer + offset, dataLength);
newNode->dataLength = dataLength;
newNode->next = NULL;
if (head == NULL) {
head = newNode;
} else {
tail->next = newNode;
}
tail = newNode;
offset += dataLength;
}
return head;
}
void freeDataNodes(DataNode *head) {
while (head != NULL) {
DataNode *next = head->next;
free(head->data);
free(head);
head = next;
}
}
void printDataList(DataNode *head) {
int index = 0;
DataNode *current = head;
while (current != NULL) {
printf("Data %d: Length = %u\n", index, current->dataLength);
index++;
current = current->next;
}
}
int getDataNodeCount(DataNode *head) {
int count = 0;
DataNode *current = head;
while (current != NULL) {
count++;
current = current->next;
}
return count;
}
const char* getDataNodeByIndex(DataNode *head, int index) {
int count = 1;
DataNode *current = head;
while (current != NULL) {
if (count == index) {
return current->data;
}
count++;
current = current->next;
}
return NULL; // 如果索引超出范围返回NULL
}

@ -0,0 +1,17 @@
#pragma once
#include <stdint.h>
typedef struct DataNode DataNode;
DataNode* splitData(char *buffer, uint32_t bufferSize);
void freeDataNodes(DataNode *head);
void printDataList(DataNode *head);
int getDataNodeCount(DataNode *head);
/**
* @param index is form 1 to n
*/
const char* getDataNodeByIndex(DataNode *head, int index);

@ -0,0 +1,59 @@
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include "dag_data_split.h"
unsigned long int fib(unsigned long int n) {
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
int main() {
char buffer[1024]; // Buffer to store input data
ssize_t bytes_read = read(0, buffer, sizeof(buffer));
if (bytes_read < 0) {
perror("Error reading input");
return 1;
}
DataNode *dataList = splitData(buffer, bytes_read);
if (dataList == NULL) {
fprintf(stderr, "Failed to split data.\n");
return 1;
}
unsigned long int num1, num2;
const char *firstdata = getDataNodeByIndex(dataList, 1);
const char *seconddata = getDataNodeByIndex(dataList, 2);
if (firstdata == NULL || seconddata == NULL) {
fprintf(stderr, "Not enough data for two numbers.\n");
freeDataNodes(dataList);
return 1;
}
if (sscanf(firstdata, "%lu", &num1) != 1 || sscanf(seconddata, "%lu", &num2) != 1) {
fprintf(stderr, "Failed to parse the numbers correctly.\n");
freeDataNodes(dataList);
return 1;
}
unsigned long int fib1 = fib(num1);
unsigned long int fib2 = fib(num2);
unsigned long int sum = fib1 + fib2;
// Prepare output string
char output[1024];
int len = snprintf(output, sizeof(output), "Fibonacci(%lu) + Fibonacci(%lu) = %lu + %lu = %lu\n", num1, num2, fib1, fib2, sum);
// Write to stdout
write(1, output, len);
// <20><><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>
freeDataNodes(dataList);
return 0;
}

@ -0,0 +1,17 @@
{
"active": true,
"name": "fibona1",
"path": "fibonacci2_wasm.so",
"port": 10000,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}

@ -0,0 +1,68 @@
{
"active": true,
"name": "work",
"path": "work_wasm.so",
"port": 10000,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work2", "work3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work2",
"path": "work_wasm.so",
"port": 10001,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work3",
"path": "work_wasm.so",
"port": 10002,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work4",
"path": "fibonacciadd_wasm.so",
"port": 10003,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
}

@ -25,9 +25,9 @@ pid2=$!
pid3=$!
./test_8c.sh $f4 $duration $concurrency 40k.jpg 10009 2>&1 &
pid4=$!
wait -f $pid1
wait -f $pid2
wait -f $pid3
wait -f $pid4
wait $pid1
wait $pid2
wait $pid3
wait $pid4
printf "[OK]\n"

@ -0,0 +1,46 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#define OUTPUT_BUFFER_SIZE (1024 * 5) // 5KB
int main() {
char *data = (char *)malloc(10 * 1024); // Allocate 10KB
if (!data) {
fprintf(stderr, "Failed to allocate memory.\n");
return 1;
}
ssize_t bytesRead = read(STDIN_FILENO, data, 10 * 1024);
if (bytesRead == -1) {
fprintf(stderr, "Failed to read data from file.\n");
free(data);
return 1;
}
// Create a combined buffer to hold the output
char *combined_output = malloc(2 * OUTPUT_BUFFER_SIZE + 50); // 50 extra bytes for text and safety
if (!combined_output) {
fprintf(stderr, "Failed to allocate memory for combined output.\n");
free(data);
return 1;
}
// Format the output in a single buffer
int offset = sprintf(combined_output, "Buffer 1 (First 5KB): ");
memcpy(combined_output + offset, data, OUTPUT_BUFFER_SIZE);
offset += OUTPUT_BUFFER_SIZE;
offset += sprintf(combined_output + offset, " Buffer 2 (Second 5KB): ");
memcpy(combined_output + offset, data + OUTPUT_BUFFER_SIZE, OUTPUT_BUFFER_SIZE);
// Print everything in one go
fwrite(combined_output, sizeof(char), offset + OUTPUT_BUFFER_SIZE, stdout);
printf("\n");
// Clean up
free(data);
free(combined_output);
return 0;
}

@ -0,0 +1,26 @@
import sys
def process_file(input_file, noop_functions):
data = {noop: [] for noop in noop_functions}
with open(input_file, "r") as infile:
for line in infile:
for noop in noop_functions:
if noop in line:
value = line.split(",")[6]
data[noop].append(value)
for noop, values in data.items():
with open(f"{noop}.txt", "w") as outfile:
outfile.write("\n".join(values))
if __name__ == "__main__":
noop_functions = ["noop1", "noop2", "noop3", "noop4", "noop5"]
argv = sys.argv[1:]
if len(argv) < 1:
print("usage:", sys.argv[0], "file_dir percentage")
sys.exit()
input_file = argv[0]
process_file(input_file, noop_functions)

@ -0,0 +1,11 @@
#include <stdio.h>
void noop() {
}
int main() {
noop();
return 0;
}

@ -0,0 +1,22 @@
import os
def calculate_average(filename):
with open(filename, "r") as file:
values = file.readlines()
values = [int(value.strip()) for value in values]
average = sum(values) / len(values) if values else 0
return average
def main():
noop_functions = ["noop1", "noop2", "noop3", "noop4", "noop5"]
for noop in noop_functions:
filename = f"{noop}.txt"
if os.path.exists(filename):
average = calculate_average(filename)
print(f"Average for {filename}: {average}")
else:
print(f"{filename} does not exist.")
if __name__ == "__main__":
main()

@ -36,7 +36,7 @@ def get_values(key, value, miss_deadline_rate, total_latency, running_times, pre
before_dot = file_name.split(".")[0]
joint_f_name = before_dot + "_total_time.txt"
cmd='python3 ~/sledge-serverless-framework/runtime/tests/meet_deadline_percentage.py %s 50' % file_name
cmd='python3 /home/hai/sledge/sledge/runtime/tests/meet_deadline_percentage.py %s 50' % file_name
rt=os.popen(cmd).read().strip()
cmd2='mv total_time.txt %s' % joint_f_name
os.popen(cmd2)

@ -0,0 +1,111 @@
#include "dag_split_image.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct DataNode {
uint32_t dataLength;
unsigned char *data;
struct DataNode *next;
};
DataNode* splitData(unsigned char *buffer, uint32_t bufferSize) {
DataNode *head = NULL;
DataNode *tail = NULL;
uint32_t offset = 0;
while (offset < bufferSize) {
if (offset + 4 > bufferSize) {
break;
}
uint32_t dataLength = *(uint32_t *)(buffer + offset);
offset += 4;
if (offset + dataLength > bufferSize) {
break;
}
DataNode *newNode = (DataNode *)malloc(sizeof(DataNode));
if (newNode == NULL) {
perror("Memory allocation failed");
freeDataNodes(head);
return NULL;
}
newNode->data = (unsigned char *)malloc(dataLength);
if (newNode->data == NULL) {
free(newNode);
perror("Memory allocation failed");
freeDataNodes(head);
return NULL;
}
memcpy(newNode->data, buffer + offset, dataLength);
newNode->dataLength = dataLength;
newNode->next = NULL;
if (head == NULL) {
head = newNode;
} else {
tail->next = newNode;
}
tail = newNode;
offset += dataLength;
}
return head;
}
void freeDataNodes(DataNode *head) {
while (head != NULL) {
DataNode *next = head->next;
free(head->data);
free(head);
head = next;
}
}
void printDataList(DataNode *head) {
int index = 0;
DataNode *current = head;
while (current != NULL) {
printf("Data %d: Length = %u\n", index, current->dataLength);
index++;
current = current->next;
}
}
int getDataNodeCount(DataNode *head) {
int count = 0;
DataNode *current = head;
while (current != NULL) {
count++;
current = current->next;
}
return count;
}
unsigned char* getDataNodeByIndex(DataNode *head, int index) {
int count = 1;
DataNode *current = head;
while (current != NULL) {
if (count == index) {
return current->data;
}
count++;
current = current->next;
}
return NULL;
}
uint32_t getImageDataSize(DataNode *head, int index){
int count = 1;
DataNode *current = head;
while (current != NULL) {
if (count == index) {
return current->dataLength;
}
count++;
current = current->next;
}
return 0;
}

@ -0,0 +1,18 @@
#pragma once
#include <stdint.h>
typedef struct DataNode DataNode;
DataNode* splitData(unsigned char *buffer, uint32_t bufferSize);
void freeDataNodes(DataNode *head);
void printDataList(DataNode *head);
int getDataNodeCount(DataNode *head);
/**
* @param index is form 1 to n
*/
unsigned char* getDataNodeByIndex(DataNode *head, int index);
uint32_t getImageDataSize(DataNode *head, int index);

@ -0,0 +1,79 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "dag_split_image.h"
#define MAX_IMG_SZ 8*1025*1024
int main()
{
unsigned char *zInpbuf = NULL;
zInpbuf = malloc(MAX_IMG_SZ);
if (!zInpbuf)
{
perror("malloc");
return -1;
}
ssize_t imgSz = read(0, zInpbuf, MAX_IMG_SZ);
if (imgSz <= 0)
{
perror("read");
free(zInpbuf);
return -1;
}
DataNode *dataList = splitData(zInpbuf, imgSz);
if (dataList == NULL) {
fprintf(stderr, "Failed to split data.\n");
return 1;
}
unsigned char *Imagedata1 = getDataNodeByIndex(dataList, 1);
unsigned char *Imagedata2 = getDataNodeByIndex(dataList, 2);
uint32_t imageSize1 = getImageDataSize(dataList, 1);
uint32_t imageSize2 = getImageDataSize(dataList, 2);
int x, y;
FILE *out_bin = stdout;
for (int i = 0; i < 54; i++)
{
x = Imagedata1[i];
y = (i < imageSize2) ? Imagedata2[i] : x;
fwrite(&x, 1, 1, out_bin);
}
int i = 0;
unsigned long big_img_offset = 54;
unsigned long small_img_offset = 54;
while (big_img_offset < imageSize1)
{
if (i == 2400) i = 0;
x = Imagedata1[big_img_offset++];
if (i < 300 && small_img_offset < imageSize2)
{
y = Imagedata2[small_img_offset++];
fwrite(&y, 1, 1, out_bin);
}
else
{
fwrite(&x, 1, 1, out_bin);
}
i++;
}
while (big_img_offset < imageSize1)
{
x = Imagedata1[big_img_offset++];
fwrite(&x, 1, 1, out_bin);
}
free(zInpbuf);
return 0;
}

@ -2703,7 +2703,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.1.undefined"
}
},
"nbformat": 4,

@ -1 +1,2 @@
sudo chsh -s /bin/bash xiaosuGW
sudo chsh -s /bin/bash hai

@ -20,10 +20,10 @@ declare project_path="$(
echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=3300
#export SLEDGE_SCHEDULER=SRSF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_CPU_SPEED=3300
export SLEDGE_SCHEDULER=EDF
#export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=1
export SLEDGE_SCHEDULER=EDF
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
@ -37,7 +37,8 @@ cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/mulitple_linear_chain.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing3.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing4.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing4.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json

@ -21,23 +21,19 @@ echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2400
export SLEDGE_SCHEDULER=SRSF
export SLEDGE_SIGALRM_HANDLER=BROADCAST
export SLEDGE_SCHEDULER=EDF
export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=1
#export SLEDGE_SCHEDULER=EDF
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_big_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_armcifar10.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_png2bmp.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_image_processing.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/mulitple_linear_chain.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing3.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing4.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_fibonacci.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/my_sodresize.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph2.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/fibc.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_dag_image.json

@ -0,0 +1,32 @@
#!/bin/bash
function usage {
echo "$0 [perf output file, chain_function_perf.log or single_function_perf.log or opt_function_perf.log]"
exit 1
}
if [ $# != 1 ] ; then
usage
exit 1;
fi
output=$1
declare project_path="$(
cd "$(dirname "$0")/../.."
pwd
)"
echo $project_path
path=`pwd`
#export SLEDGE_DISABLE_PREEMPTION=true
export SLEDGE_CPU_SPEED=2500
export SLEDGE_SCHEDULER=FIFO
#xport SLEDGE_SIGALRM_HANDLER=BROADCAST
#export SLEDGE_SIGALRM_HANDLER=TRIAGED
#export SLEDGE_NWORKERS=16
#export SLEDGE_SCHEDULER=EDF
export SLEDGE_SANDBOX_PERF_LOG=$path/$output
echo $SLEDGE_SANDBOX_PERF_LOG
cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_noop1.json

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save