修改数据拷贝方式,关于50%的响应获取问题还有待商榷

sledge_graph
hwwang 4 months ago
parent 0f9b75d784
commit bae018314f

@ -107,7 +107,11 @@
"xmalloc.h": "c",
"stddef.h": "c",
"__mutex_base": "c",
"memory": "c"
"memory": "c",
"atomic": "c",
"condition_variable": "c",
"ostream": "c",
"stop_token": "c"
},
"files.exclude": {
"**/.git": true,

@ -1 +0,0 @@
10

@ -1,26 +1,46 @@
Summary:
Total: 5.0283 secs
Slowest: 0.0000 secs
Fastest: 0.0000 secs
Average: NaN secs
Requests/sec: 51018.3313
Total: 10.1989 secs
Slowest: 0.2733 secs
Fastest: 0.0142 secs
Average: 0.2073 secs
Requests/sec: 571.9239
Total data: 268318 bytes
Size/request: 46 bytes
Response time histogram:
0.014 [1] |
0.040 [9] |
0.066 [15] |
0.092 [13] |
0.118 [13] |
0.144 [14] |
0.170 [15] |
0.196 [827] |■■■■■■■■
0.222 [3956] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.247 [943] |■■■■■■■■■■
0.273 [27] |
Latency distribution:
10% in 0.1929 secs
25% in 0.1991 secs
50% in 0.2068 secs
75% in 0.2167 secs
90% in 0.2276 secs
95% in 0.2334 secs
99% in 0.2433 secs
Details (average, fastest, slowest):
DNS+dialup: NaN secs, 0.0000 secs, 0.0000 secs
DNS-lookup: NaN secs, 0.0000 secs, 0.0000 secs
req write: NaN secs, 0.0000 secs, 0.0000 secs
resp wait: NaN secs, 0.0000 secs, 0.0000 secs
resp read: NaN secs, 0.0000 secs, 0.0000 secs
DNS+dialup: 0.0004 secs, 0.0142 secs, 0.2733 secs
DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
req write: 0.0001 secs, 0.0000 secs, 0.0164 secs
resp wait: 0.2063 secs, 0.0050 secs, 0.2712 secs
resp read: 0.0001 secs, 0.0000 secs, 0.0148 secs
Status code distribution:
[200] 5833 responses
Error distribution:
[256536] Post "http://127.0.0.1:10000": dial tcp 127.0.0.1:10000: connect: connection refused

@ -83,6 +83,8 @@ struct module {
char **next_module_names; /* the next modules name in the DAG */
uint32_t next_module_count;
uint32_t pre_module_count;
bool runtime_visited;
uint32_t run_priority;
};
/*************************

@ -156,8 +156,6 @@ current_sandbox_start(void)
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//printf("这是图一分多 the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
@ -208,6 +206,9 @@ current_sandbox_start(void)
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
/*free memory of pre_func_out, Because it has been deeply copied its copy into requestbecause */
free(pre_func_output);
pre_func_output = NULL;
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
}else if (next_module_idx == 1 && next_module_pre_count > 1)
{
@ -217,7 +218,6 @@ current_sandbox_start(void)
LOCK_INIT(&lock);
lock_flag = false;
}
bool hash_flag = false;
/*Before each id is put into the hash table, the key needs to add a "module handle"*/
struct module * next_module_node = next_module[0];
assert(next_module_node);
@ -226,14 +226,8 @@ current_sandbox_start(void)
cur_request_id = (char *)malloc(key_len);
assert(cur_request_id);
snprintf(cur_request_id, key_len, "%s%lu", next_module_node->name, sandbox->id);
debuglog("the cur_request_id is %s\n", cur_request_id);
uint32_t ret_value_len;
LOCK_LOCK(&lock);
//printf("锁住了\n");
uint64_t *requet_id = (uint64_t *)map_get(sandbox_request_id, cur_request_id, strlen(cur_request_id), &ret_value_len);
if(!requet_id) hash_flag = true;
if (hash_flag) {
//it means that the first sandbox is calculated, and it needs to wait for the return value of other sandboxes.
/*calculation the pre_function_out*/
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
@ -241,12 +235,13 @@ current_sandbox_start(void)
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
//printf("创建sandbox_request 模块工作结果 the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
//debuglog("the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
LOCK_LOCK(&lock);
uint64_t *requet_id = (uint64_t *)map_get(sandbox_request_id, cur_request_id, strlen(cur_request_id), &ret_value_len);
if (!requet_id) {
uint64_t enqueue_timestamp = __getcycles();
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
@ -258,7 +253,7 @@ current_sandbox_start(void)
*/
sandbox_request->id = sandbox->id;
uint32_t module_pre_count = next_module[0]->pre_module_count;
uint32_t module_pre_count = next_module_pre_count;
module_pre_count--;
assert(module_pre_count);
map_set(sandbox_request_id, cur_request_id, strlen(cur_request_id), &module_pre_count, sizeof(uint32_t), true);
@ -270,28 +265,23 @@ current_sandbox_start(void)
{
uint32_t rest_pre_count = *requet_id;
assert(rest_pre_count >= 1);
struct sandbox_request *sandbox_request = map_get(sandbox_req_map, cur_request_id, strlen(cur_request_id), &ret_value_len);
assert(sandbox_request);
// Copy data into pre_func_output
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char *pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
free(pre_func_output);
goto err;
}
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
debuglog("这是合并sandbox_request模块工作结果 the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
ssize_t new_output_length = sandbox_request->output_length + output_length + 1;
/*ssize_t new_output_length = sandbox_request->output_length + output_length + 1;
char *new_output = (char *)malloc(new_output_length);
if (!new_output) {
fprintf(stderr, "Failed to allocate memory for the new output: %s\n", strerror(errno));
goto err;
}
memset(new_output, 0, new_output_length);
if (sandbox->module->run_priority == 1)
{
snprintf(new_output, new_output_length, "%s&%s", sandbox_request->previous_function_output, pre_func_output);
}else{
snprintf(new_output, new_output_length, "%s&%s", pre_func_output, sandbox_request->previous_function_output);
}
if(sandbox_request->previous_function_output != NULL)
{
free(sandbox_request->previous_function_output);
@ -299,10 +289,37 @@ current_sandbox_start(void)
}
assert(new_output);
sandbox_request->previous_function_output = new_output;
debuglog("这是合并sandbox_request 模块工作结果 the ID %lu %s new_output is %s\n", sandbox->id, sandbox->module->name, new_output);
debuglog("the ID %lu %s the merge_output is %s\n", sandbox->id, sandbox->module->name, new_output);
free(pre_func_output);
pre_func_output = NULL;
sandbox_request->output_length = new_output_length;*/
ssize_t new_output_length = sandbox_request->output_length + output_length;
char *new_output = (char *)malloc(new_output_length);
if (!new_output) {
fprintf(stderr, "Failed to allocate memory for the new output: %s\n", strerror(errno));
goto err;
}
memset(new_output, 0, new_output_length);
if (sandbox->module->run_priority == 1)
{
memcpy(new_output, pre_func_output, output_length - 1);
new_output[output_length - 1] = '&';
memcpy(new_output + output_length, sandbox_request->previous_function_output, sandbox_request->output_length);
}else{
memcpy(new_output, sandbox_request->previous_function_output, sandbox_request->output_length-1);
new_output[sandbox_request->output_length-1] = '&';
memcpy(new_output + sandbox_request->output_length + 1, pre_func_output, output_length);
}
if(sandbox_request->previous_function_output != NULL)
{
free(sandbox_request->previous_function_output);
sandbox_request->previous_function_output = NULL;
}
//debuglog("the ID %lu %s the merge_output is %s\n", sandbox->id, sandbox->module->name, new_output);
sandbox_request->previous_function_output = new_output;
sandbox_request->output_length = new_output_length;
free(pre_func_output);
pre_func_output = NULL;
rest_pre_count--;
if (rest_pre_count != 0)
@ -319,9 +336,7 @@ current_sandbox_start(void)
free(cur_request_id);
cur_request_id = NULL;
}
//printf("解锁");
LOCK_UNLOCK(&lock);
//printf("没有死索\n");
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}

@ -189,12 +189,17 @@ listener_thread_main(void *dummy)
{
struct module *current_module = queue[front++];
estimated_execution_time += admission_info_get_percentile(&current_module->admissions_info);
debuglog("Estimated execution time for module %s is %lu\n", current_module->name, estimated_execution_time);
for (int i = 0; i < current_module->next_module_count; i++) {
if (current_module->next_module[i] != NULL)
if (current_module->next_module[i] != NULL && !current_module->next_module[i]->runtime_visited)
{
queue[rear++] = current_module->next_module[i];
current_module->next_module[i]->runtime_visited = true;
}
}
/*Recover the flags of the module here, so that it can be accessed next time.*/
current_module->runtime_visited = false;
assert(rear <= QUEUE_SIZE);
assert(front <= QUEUE_SIZE);
}

@ -407,6 +407,7 @@ module_new_from_json(char *file_name)
int32_t request_size = 0;
int32_t response_size = 0;
int32_t argument_count = 0;
uint32_t priority = 1;
uint32_t port = 0;
uint32_t relative_deadline_us = 0;
uint32_t expected_execution_us = 0;
@ -448,6 +449,11 @@ module_new_from_json(char *file_name)
if (buffer < 0 || buffer > 65535)
panic("Expected port between 0 and 65535, saw %d\n", buffer);
port = buffer;
}else if (strcmp(key, "priority") == 0)
{
priority = atoi(val);
if (priority < 0 || priority > 5)
panic("Expected priority between 0 and 5, saw %d\n", priority);
} else if (strcmp(key, "argsize") == 0) {
// Validate in expected range 0..127. Unclear if 127 is an actual hard limit
argument_count = atoi(val);
@ -613,6 +619,8 @@ module_new_from_json(char *file_name)
module->pre_module_count = pre_module_count;
module->next_module = NULL;
module->pre_module = NULL;
module->runtime_visited = false;
module->run_priority = priority;
assert(module);
module_set_http_info(module, request_count, request_headers, request_content_type,
@ -664,6 +672,7 @@ module_new_from_json(char *file_name)
}
}
free(nodes);
nodes = NULL;
#ifdef LOG_MODULE_LOADING
debuglog("Loaded %d module%s!\n", module_count, module_count > 1 ? "s" : "");
#endif

@ -2,7 +2,7 @@ include Makefile.inc
#TESTS=fibonacci fibonacci2 fibonacci3 big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS=fibonacci big_fibonacci C-Image-Manip empty work work1k work10k work100k work1m forever filesys sockserver sockclient empty
TESTS2=fibonacciadd
TESTS2=fibonacciadd mem
TESTSRT=$(TESTS:%=%_rt)
TESTSRT2=$(TESTS2:%=%_rt)

Binary file not shown.

@ -20,8 +20,12 @@ int main() {
}
buffer[bytes_read] = '\0';
unsigned long int num1, num2;
// Remove potential newline character at the end of the input
if (bytes_read > 0 && (buffer[bytes_read - 1] == '\n' || buffer[bytes_read - 1] == '\r')) {
buffer[bytes_read - 1] = '\0';
}
unsigned long int num1, num2;
char *line = strtok(buffer, "&");
char *second_part = strtok(NULL, "&"); // Assume the rest of the string is the second number
@ -39,8 +43,10 @@ int main() {
// Write to stdout
write(1, output, len);
} else {
const char *error_msg = "Invalid input. Please enter two numbers separated by .\n";
write(1, error_msg, strlen(error_msg));
const char *error_msg = "Invalid input. Please enter two numbers separated by '&'. Your input was: ";
char output_buffer[2048]; // Buffer to hold the error message and user input
int len = snprintf(output_buffer, sizeof(output_buffer), "%s%s\n", error_msg, buffer);
write(1, output_buffer, len);
return 1;
}

@ -5,6 +5,7 @@
"port": 10000,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 0,
"next_modules": ["work2", "work3"],
"http-req-headers": [],
@ -21,6 +22,7 @@
"port": 10001,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
@ -37,6 +39,7 @@
"port": 10002,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
@ -53,6 +56,7 @@
"port": 10003,
"relative-deadline-us": 50000,
"argsize": 1,
"priority": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],

@ -0,0 +1,46 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#define OUTPUT_BUFFER_SIZE (1024 * 5) // 5KB
int main() {
char *data = (char *)malloc(10 * 1024); // Allocate 10KB
if (!data) {
fprintf(stderr, "Failed to allocate memory.\n");
return 1;
}
ssize_t bytesRead = read(STDIN_FILENO, data, 10 * 1024);
if (bytesRead == -1) {
fprintf(stderr, "Failed to read data from file.\n");
free(data);
return 1;
}
// Create a combined buffer to hold the output
char *combined_output = malloc(2 * OUTPUT_BUFFER_SIZE + 50); // 50 extra bytes for text and safety
if (!combined_output) {
fprintf(stderr, "Failed to allocate memory for combined output.\n");
free(data);
return 1;
}
// Format the output in a single buffer
int offset = sprintf(combined_output, "Buffer 1 (First 5KB): ");
memcpy(combined_output + offset, data, OUTPUT_BUFFER_SIZE);
offset += OUTPUT_BUFFER_SIZE;
offset += sprintf(combined_output + offset, " Buffer 2 (Second 5KB): ");
memcpy(combined_output + offset, data + OUTPUT_BUFFER_SIZE, OUTPUT_BUFFER_SIZE);
// Print everything in one go
fwrite(combined_output, sizeof(char), offset + OUTPUT_BUFFER_SIZE, stdout);
printf("\n");
// Clean up
free(data);
free(combined_output);
return 0;
}

File diff suppressed because one or more lines are too long

@ -0,0 +1,44 @@
Runtime Environment:
CPU Speed: 2400 MHz
Processor Speed: 2400 MHz
RLIMIT_DATA: Infinite
RLIMIT_NOFILE: 1048576 (Increased from 8192)
Core Count: 8
Listener core ID: 1
First Worker core ID: 2
Worker core count: 6
Scheduler Policy: EDF
Sigalrm Policy: BROADCAST
Preemption: Enabled
Quantum: 5000 us
Sandbox Performance Log: /home/hai/sledge-old/runtime_sandbox_perf_log.log
Starting listener thread
Listener core thread: 7ffff7a006c0
Starting 6 worker thread(s)
C: 01, T: 0x7ffff7bfdd80, F: runtime_start_runtime_worker_threads>
Sandboxing environment ready!
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
C: 03, T: 0x7ffff66006c0, F: current_sandbox_start>
the ID 0 work2 pre_func_output is 10
C: 02, T: 0x7ffff70006c0, F: current_sandbox_start>
the ID 0 work3 pre_func_output is 10
C: 02, T: 0x7ffff70006c0, F: current_sandbox_start>
the ID (the output need to be merged) 0 work3 pre_func_output is 10
C: 02, T: 0x7ffff70006c0, F: current_sandbox_start>
the ID 0 work3 the merge_output is 10&10
Loading…
Cancel
Save