修复链式中数据流二次拷贝的问题,以及节点扇出数据二次拷贝的问题

sledge_graph
hwwang 2 weeks ago
parent 8990f5c5ed
commit ed80c30ccf

@ -0,0 +1,41 @@
# -*- coding: UTF-8 -*-
import matplotlib.pyplot as plt
def plot_max_rps_comparison():
# Data from the previous description
sizes = ['5KB', '40KB', '105KB', '305KB']
max_rps_before = [121, 85, 63, 33]
max_rps_after = [607, 235, 128, 45]
# Index for each bar position along the x-axis
index = range(len(sizes))
bar_width = 0.35 # width of the bars
# Setting up the plot
fig, ax = plt.subplots()
ax.set_facecolor('#f0f0f0')
# Creating bars for "Before Optimization"
rects1 = ax.bar(index, max_rps_before, bar_width, label='Mixed Task', color='orange')
# Creating bars for "After Optimization" shifted to the right by `bar_width`
rects2 = ax.bar([p + bar_width for p in index], max_rps_after, bar_width, label='Single Task', color='skyblue')
# Adding labels and title
ax.set_xlabel('Image Size(KB)')
ax.set_ylabel('MAX RPS')
ax.set_title('Mixed and Single Task Performance Comparison')
# Setting the position of the x-ticks to be in the middle of the grouped bars
ax.set_xticks([p + bar_width / 2 for p in index])
ax.set_xticklabels(sizes)
# Adding a legend to explain which bars represent before and after optimization
ax.legend()
# Displaying the plot
plt.show()
# Call the function to display the plot
plot_max_rps_comparison()

@ -13,30 +13,30 @@ def load_data(filename):
def main():
# 加载数据
edf_data = load_data('edf_5k.txt')
llf_data = load_data('llf_5k.txt')
edf_data = [0.83, 1.35, 1.88, 2.36, 1.9]
llf_data = [0.45, 0.4, 0.52, 0.97, 0.9]
# 设置X轴的数据点
x_labels = [50, 60, 70, 80, 90, 100] # 确保数据与这些标签相匹配
x_labels = [50, 60, 70, 80, 90] # 确保数据与这些标签相匹配
font_properties = fm.FontProperties(family='Times New Roman', size=18)
plt.rcParams.update({'font.size': 18, 'font.family': 'Times New Roman'})
# 创建图形和绘制数据
plt.figure(figsize=(10, 6))
plt.figure(figsize=(10, 5))
ax = plt.gca() # 获取当前的Axes对象ax
ax.set_facecolor('#f0f0f0') # 设置浅灰色背景
plt.plot(x_labels, edf_data, marker='s', linestyle='-', color='#C8503D', markersize=8, label='EDF')
plt.plot(x_labels, llf_data, marker='^', linestyle='-', color='#00008B', markersize=8, label='LLF')
# 添加标题、标签和图例
plt.title('5KB-1.2* Deadline', fontsize=20, fontproperties=font_properties)
plt.title('5KB-1.5*Deadline', fontsize=20, fontproperties=font_properties)
plt.xlabel('Load (% of maximum RPS)', fontproperties=font_properties)
plt.ylabel('Deadline Miss Rate (%)', fontproperties=font_properties)
plt.legend(prop=font_properties)
# 设置X轴刻度
plt.xticks(range(50, 101, 10))
plt.xticks(range(50, 91, 10))
# 设置网格
plt.grid(True)

@ -35,7 +35,6 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand
sandbox->state = SANDBOX_SET_AS_INITIALIZED;
sandbox->request_from_outside = sandbox_request->request_from_outside;
concatenate_outputs(sandbox_request);
sandbox->previous_function_output = sandbox_request->previous_function_output;
sandbox->output_length = sandbox_request->output_length;
sandbox->previous_request_length = sandbox_request->previous_request_length;

@ -162,9 +162,8 @@ current_sandbox_start(void)
assert(next_module);
size_t next_module_pre_count = next_module[0]->pre_module_count;
assert(next_module_pre_count);
if (next_module_idx > 1 || (next_module_idx == 1 && next_module_pre_count == 1))
if (next_module_idx == 1 && next_module_pre_count == 1)
{
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
@ -173,20 +172,41 @@ current_sandbox_start(void)
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
struct module *next_module_node = next_module[0];
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack, sandbox->laxity, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
global_request_scheduler_add(sandbox_request);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
} else if (next_module_idx > 1 && next_module_pre_count == 1)
{
assert(next_module_idx > 1);
for (size_t i = 0; i < next_module_idx; i++)
{
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct module * next_module_node = next_module[i];
assert(next_module_node);
char * individual_pre_func_output = (char *)malloc(output_length);
if (!individual_pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the individual previous output: %s\n", strerror(errno));
free(pre_func_output);
goto err;
}
memcpy(individual_pre_func_output, pre_func_output, output_length);
uint64_t now = __getcycles();
current_sandbox_get_newlaxity(sandbox, now);
#ifdef LOG_DEEP_LEARN_SCHDUE
@ -205,12 +225,11 @@ current_sandbox_start(void)
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, now,
sandbox->remaining_slack, sandbox->laxity, true, NULL, 0);
sandbox->remaining_slack, sandbox->laxity, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
pre_functions_output_request_add(sandbox_request, individual_pre_func_output, output_length, sandbox->module->run_priority);
#ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
@ -236,10 +255,8 @@ current_sandbox_start(void)
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
/*free memory of pre_func_out, Because it has been deeply copied its copy into requestbecause */
free(pre_func_output);
pre_func_output = NULL;
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
goto done;
} else if (next_module_idx == 1 && next_module_pre_count > 1)
{
static bool lock_flag = true;
@ -327,6 +344,7 @@ current_sandbox_start(void)
{
map_upsert(sandbox_request_id, cur_request_id, strlen(cur_request_id), &rest_pre_count, sizeof(uint32_t));
}else{
concatenate_outputs(sandbox_request);
uint64_t enqueue_timestamp = __getcycles();
sandbox_request->enqueue_timestamp = enqueue_timestamp;
global_request_scheduler_add(sandbox_request);

@ -151,16 +151,17 @@ sandbox_allocate(struct sandbox_request *sandbox_request)
/* Set state to initializing */
sandbox_set_as_initialized(sandbox, sandbox_request, now);
if (sandbox_request->pre_functions_output != NULL)
{
struct sandbox_pre_functions_output *current = sandbox_request->pre_functions_output;
struct sandbox_pre_functions_output *next = NULL;
while (current) {
next = current->next;
free(current->previous_function_output);
free(current);
current = next;
}
current = next; }
sandbox_request->pre_functions_output = NULL;
}
pthread_spin_destroy(&sandbox_request->lock);
free(sandbox_request);
done:

@ -2,7 +2,7 @@ include Makefile.inc
#TESTS=fibonacci fibonacci2 fibonacci3 big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS=fibonacci big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS2=fibonacciadd mem work3 picinpic noop
TESTS2=fibonacciadd mem work3 picinpic noop fibonacci2 fibchain
TESTSRT=$(TESTS:%=%_rt)
TESTSRT2=$(TESTS2:%=%_rt)

@ -0,0 +1,85 @@
{
"active": true,
"name": "fibona1",
"path": "fibchain_wasm.so",
"port": 10000,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["fibona2"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona2",
"path": "fibchain_wasm.so",
"port": 10002,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona3",
"path": "fibchain_wasm.so",
"port": 10003,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona4",
"path": "fibchain_wasm.so",
"port": 10004,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["fibona5"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibona5",
"path": "fibchain_wasm.so",
"port": 10005,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}

@ -0,0 +1,20 @@
#include <stdio.h>
#include <stdlib.h>
unsigned long int
fib(unsigned long int n)
{
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
int
main(int argc, char **argv)
{
unsigned long n = 0;
scanf("%lu", &n);
n= fib(n);
printf("%lu\n", n);
return 0;
}

@ -1,40 +1,30 @@
#include <stdio.h>
#include <stdlib.h>
// #include "get_time.h"
unsigned long int
fib(unsigned long int n)
{
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
/*
int
main(int argc, char **argv)
{
unsigned long r = 0;
//scanf("%s", recv_buf);
r = fib(30);
printf("%lu\n", r);
return 0;
}*/
int
main(int argc, char **argv)
{
//char * recv_buf = malloc(1024 * 1024);
char recv_buf[1024 * 1024] = {0};
//memset(recv_buf, 0, 1024 * 1024);
unsigned long r = 0;
//scanf("%s", recv_buf);
r = read(0, recv_buf, 1024 * 1024);
unsigned long n = 0;
scanf("%lu", &n);
//size_t rd = read(0, recv_buf, 1000*1024);
//if (rd <= 0) return -1;
// unsigned long long st = get_time(), en;
//r = fib(30);
// en = get_time();
printf("%lu\n", r);
for (int i = 0; i < 3; i++)
{
n = fib(n);
}
printf("%lu\n", n);
// print_time(st, en);
return 0;
}

@ -0,0 +1,17 @@
{
"active": true,
"name": "fibona1",
"path": "fibonacci2_wasm.so",
"port": 10000,
"relative-deadline-us": 18000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}

@ -32,7 +32,8 @@ cd $project_path/runtime/bin
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/graph.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph2.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_multiple_image_processing_graph2.json
LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/fibc.json
#LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH" ./sledgert ../tests/test_dag_image.json

@ -5,36 +5,76 @@ function usage {
exit 1
}
picture=$1
scheduler=$2
#chmod 400 ./id_rsa
#path="/home/weihao/sledge/sledge_tree/runtime/tests"
##path="/home/weihao/sledge/sledge_tree/runtime/tests"
path="/home/njl/sledge/runtime/tests"
#test single 5k c5 50% max RPS (480)
f1="5k_single_50.txt"
server_log_file="execution_single_5k_50.log"
$path/start.sh $server_log_file >/dev/null 2>&1 &
echo "sledge is running"
./test_rps.sh $f1 120 48 5k.jpg 10000 2>&1 &
##test single $picture c5 50% max RPS (480)
f1="${picture}_single_50.txt"
server_log_file="execution_single_${picture}_50.log"
ssh -o stricthostkeychecking=no njl@10.16.31.135"$path/start.sh $server_log_file >/dev/null 2>&1 &"
./test_rps.sh $f1 30 4 $picture.jpg 10000 2>&1 &
pid1=$!
wait $pid1
$path/kill_sledge.sh
ssh -t -o stricthostkeychecking=no njl@10.16.31.135"$path/kill_sledge.sh"
#test single 5k c5 70% max RPS
f1="5k_single_70.txt"
server_log_file="execution_single_5k_70.log"
$path/start.sh $server_log_file >/dev/null 2>&1 &
echo "sledge is running"
./test_rps.sh $f1 120 68 5k.jpg 10000 2>&1 &
pid1=$!
wait $pid1
$path/kill_sledge.sh
#test single 5k c5 99% max RPS
f1="5k_single_99.txt"
server_log_file="execution_single_5k_99.log"
$path/start.sh $server_log_file >/dev/null 2>&1 &
echo "sledge is running"
./test_rps.sh $f1 120 96 5k.jpg 10000 2>&1 &
pid1=$!
wait $pid1
$path/kill_sledge.sh
# f1="${picture}_single_60.txt"
# server_log_file="execution_single_${picture}_60.log"
# $path/start.sh $server_log_file >/dev/null 2>&1 &
# echo "sledge is running"
# ./test_rps.sh $f1 30 5 $picture.jpg 10000 2>&1 &
# pid1=$!
# wait $pid1
# $path/kill_sledge.sh
# #test single $picture c5 70% max RPS
# f1="${picture}_single_70.txt"
# server_log_file="execution_single_${picture}_70.log"
# $path/start.sh $server_log_file >/dev/null 2>&1 &
# echo "sledge is running"
# ./test_rps.sh $f1 30 6 $picture.jpg 10000 2>&1 &
# pid1=$!
# wait $pid1
# $path/kill_sledge.sh
# f1="${picture}_single_80.txt"
# server_log_file="execution_single_${picture}_80.log"
# $path/start.sh $server_log_file >/dev/null 2>&1 &
# echo "sledge is running"
# ./test_rps.sh $f1 30 7 $picture.jpg 10000 2>&1 &
# pid1=$!
# wait $pid1
# $path/kill_sledge.sh
# f1="${picture}_single_90.txt"
# server_log_file="execution_single_${picture}_90.log"
# $path/start.sh $server_log_file >/dev/null 2>&1 &
# echo "sledge is running"
# ./test_rps.sh $f1 30 8 $picture.jpg 10000 2>&1 &
# pid1=$!
# wait $pid1
# $path/kill_sledge.sh
# #test single $picture c5 99% max RPS
# f1="${picture}_single_99.txt"
# server_log_file="execution_single_${picture}_99.log"
# $path/start.sh $server_log_file >/dev/null 2>&1 &
# echo "sledge is running"
# ./test_rps.sh $f1 30 9 $picture.jpg 10000 2>&1 &
# pid1=$!
# wait $pid1
# $path/kill_sledge.sh
# mv ${picture}_single_50.txt ~/meet_deadline/${picture}_llf/
# mv ${picture}_single_60.txt ~/meet_deadline/${picture}_llf/
# mv ${picture}_single_70.txt ~/meet_deadline/${picture}_llf/
# mv ${picture}_single_80.txt ~/meet_deadline/${picture}_llf/
# mv ${picture}_single_90.txt ~/meet_deadline/${picture}_llf/
# mv ${picture}_single_99.txt ~/meet_deadline/${picture}_llf/
# rm *.txt
# python3 parse_parse_python.py $picture

@ -12,7 +12,7 @@ fi
echo "current_rps(*5) add_step(*5) duratime"
#path="/home/njl/sledge/runtime/tests"
path="/home/hai/sledge/sledge/runtime/tests"
path="/home/njl/sledge/runtime/tests"
current_rps=$1
step=$2
@ -27,14 +27,13 @@ server_log_file="test_rps.log"
loop=1
for loop in {1..5}; do
$path/start-edf.sh $server_log_file >/dev/null 2>&1 &
ssh -o stricthostkeychecking=no njl@10.16.31.135 "$path/start.sh $server_log_file >/dev/null 2>&1 &"
echo "sledge is running loop $loop"
./test_rps.sh $output $duratime $current_rps 5k.jpg 10000 2>&1 &
pid1=$!
wait $pid1
$path/kill_sledge.sh
ssh -t -o stricthostkeychecking=no njl@10.16.31.135 "$path/kill_sledge.sh"
latency=$(grep "Requests" $output | awk -F ': ' '{print $2}')
if (( $(echo "$latency < $max_rps" | bc -l) )); then

@ -16,6 +16,6 @@ port=$5
echo "hey test"
hey -disable-compression -disable-keepalive -disable-redirects -c 5 -q $rps -z $duration\s -t 0 -m GET -D "$image" "http://127.0.0.1:$port" > $output
hey -disable-compression -disable-keepalive -disable-redirects -c 5 -q $rps -z $duration\s -t 0 -m GET -D "$image" "http://10.16.31.135:$port" > $output
#loadtest -c 5 --rps $rps -t $duration --method GET --data @$image "http://10.16.109.192:$port" > $output
#hey -disable-compression -disable-keepalive -disable-redirects -c 8 -q 50 -z $duration\s -t 0 -m GET -D "$image" "http://10.10.1.1:$port" > $output
Loading…
Cancel
Save