已添加DAG,需要修改各部份时间测试代码

newsch
hwwang 5 months ago
parent 70fdbb348c
commit 3aeed6f94b

@ -70,6 +70,27 @@
"ignoreFailures": true
}
]
},
{
"name": "utest",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/runtime/utest/map",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"sourceFileMap": {},
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"envFile": "${workspaceFolder}/.env",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
}
]
}

@ -103,7 +103,9 @@
"map.h": "c",
"stdio.h": "c",
"hashmap.h": "c",
"mutex": "cpp"
"mutex": "cpp",
"xmalloc.h": "c",
"stddef.h": "c"
},
"files.exclude": {
"**/.git": true,

@ -16,11 +16,12 @@
struct map_node {
struct map_node *next;
char *key;
void *key;
void *value;
uint32_t key_len;
uint32_t value_len;
uint32_t hash;
bool manage_mvalue;
};
struct map_bucket {
@ -39,16 +40,17 @@ map_init(struct hashmap *restrict map)
map->buckets[i].head = NULL;
LOCK_INIT(&map->buckets[i].lock);
}
};
}
/* See https://en.wikipedia.org/wiki/Jenkins_hash_function */
static inline uint32_t
jenkins_hash(char *key, uint32_t key_len)
jenkins_hash(void *key, uint32_t key_len)
{
uint32_t i = 0;
uint32_t hash = 0;
unsigned char *data = (unsigned char *)key;
while (i != key_len) {
hash += key[i++];
hash += data[i++];
hash += hash << 10;
hash ^= hash >> 6;
}
@ -59,7 +61,7 @@ jenkins_hash(char *key, uint32_t key_len)
}
static inline void *
map_get(struct hashmap *map, char *key, uint32_t key_len, uint32_t *ret_value_len)
map_get(struct hashmap *map, void *key, uint32_t key_len, uint32_t *ret_value_len)
{
void *value = NULL;
@ -84,13 +86,15 @@ DONE:
}
static inline bool
map_set(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len)
map_set(struct hashmap *map, void *key, uint32_t key_len, void *value, uint32_t value_len, bool manage_mvalue)
{
bool did_set = false;
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
LOCK_LOCK(&bucket->lock);
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash) goto DONE;
}
@ -99,14 +103,18 @@ map_set(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t
*(new_node) = (struct map_node){ .hash = hash,
.key = xmalloc(key_len),
.key_len = key_len,
.value = value,
.value_len = value_len,
.next = bucket->head };
// Copy Key and Value
memcpy(new_node->key, key, key_len);
//memcpy(new_node->value, value, value_len);
if (manage_mvalue) {
new_node->value = xmalloc(value_len);
memcpy(new_node->value, value, value_len);
} else {
new_node->value = value;
}
new_node->manage_mvalue = manage_mvalue;
bucket->head = new_node;
did_set = true;
@ -119,7 +127,7 @@ DONE:
* @returns boolean if node was deleted or not
*/
static inline bool
map_delete(struct hashmap *map, char *key, uint32_t key_len)
map_delete(struct hashmap *map, void *key, uint32_t key_len)
{
bool did_delete = false;
@ -131,7 +139,9 @@ map_delete(struct hashmap *map, char *key, uint32_t key_len)
if (prev != NULL && prev->hash == hash) {
bucket->head = prev->next;
free(prev->key);
//free(prev->value);
if (prev->manage_mvalue) {
free(prev->value);
}
free(prev);
did_delete = true;
goto DONE;
@ -140,7 +150,9 @@ map_delete(struct hashmap *map, char *key, uint32_t key_len)
for (struct map_node *node = prev->next; node != NULL; prev = node, node = node->next) {
prev->next = node->next;
free(node->key);
//free(node->value);
if (node->manage_mvalue) {
free(node->value);
}
free(node);
did_delete = true;
goto DONE;
@ -151,8 +163,8 @@ DONE:
return did_delete;
}
/* static inline void
map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32_t value_len)
static inline void
map_upsert(struct hashmap *map, void *key, uint32_t key_len, void *value, uint32_t value_len)
{
uint32_t hash = MAP_HASH(key, key_len);
struct map_bucket *bucket = &map->buckets[hash % MAP_BUCKET_COUNT];
@ -161,10 +173,13 @@ map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32
for (struct map_node *node = bucket->head; node != NULL; node = node->next) {
if (node->hash == hash) {
node->value_len = value_len;
//node->value = realloc(node->value, value_len);
node->value = value;
node->value = realloc(node->value, value_len);
//node->value = value;
assert(node->value);
//memcpy(node->value, value, value_len);
if (node->manage_mvalue)
{
memcpy(node->value, value, value_len);
}
goto DONE;
}
}
@ -183,11 +198,10 @@ map_upsert(struct hashmap *map, char *key, uint32_t key_len, void *value, uint32
// Copy Key and Value
memcpy(new_node->key, key, key_len);
new_node->value = value;
//memcpy(new_node->value, value, value_len);
memcpy(new_node->value, value, value_len);
bucket->head = new_node;
DONE:
LOCK_UNLOCK(&bucket->lock);
} */
}

@ -149,7 +149,7 @@ perf_window_get_percentile(struct perf_window *self, int percentile, int precomp
return 0;
}
if (likely(size >= PERF_WINDOW_BUFFER_SIZE)) return self->by_duration[precomputed_index].execution_time;
//if (likely(size >= PERF_WINDOW_BUFFER_SIZE)) return self->by_duration[precomputed_index].execution_time;
return self->by_duration[size * percentile / 100].execution_time;
}

@ -3,12 +3,12 @@
#include <stdlib.h>
#include "likely.h"
#include "panic.h"
//#include "panic.h"
static inline void *
xmalloc(size_t size)
{
void *allocation = malloc(size);
if (unlikely(allocation == NULL)) panic("xmalloc failed!\n");
//if (unlikely(allocation == NULL)) panic("xmalloc failed!\n");
return allocation;
}

@ -73,11 +73,21 @@ current_sandbox_start(void)
sandbox_initialize_stdio(sandbox);
int next_module_idx = sandbox->module->next_module_count;
static struct hashmap *sandbox_req_map = NULL;
if (sandbox_req_map == NULL) {
sandbox_req_map = malloc(sizeof(struct hashmap));
static struct hashmap *sandbox_req_map = NULL;
static struct hashmap *sandbox_request_id = NULL;
if (sandbox_req_map == NULL || sandbox_request_id == NULL) {
if(sandbox_req_map == NULL)
{
sandbox_req_map = malloc(sizeof(struct hashmap));
map_init(sandbox_req_map);
}
if(sandbox_request_id == NULL)
{
sandbox_request_id = malloc(sizeof(struct hashmap));
map_init(sandbox_request_id);
}
assert(sandbox_req_map != NULL);
map_init(sandbox_req_map);
assert(sandbox_request_id != NULL);
}
@ -130,61 +140,162 @@ current_sandbox_start(void)
*/
goto err;
} else if (next_module != NULL) {
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t enqueue_timestamp = __getcycles();
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
assert(next_module_idx);
assert(next_module);
for (size_t i = 0; i < next_module_idx; i++)
size_t next_module_pre_count_flag = next_module[0]->pre_module_count;
assert(next_module_pre_count_flag);
if (next_module_idx > 1 || (next_module_idx == 1 && next_module_pre_count_flag == 1))
{
struct module * next_module_node = next_module[i];
assert(next_module_node);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t enqueue_timestamp = __getcycles();
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
for (size_t i = 0; i < next_module_idx; i++)
{
struct module * next_module_node = next_module[i];
assert(next_module_node);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, enqueue_timestamp,
sandbox->remaining_slack, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
sandbox->remaining_slack, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
#ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) {
free(sandbox_request);
goto err;
}
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if (sandbox->absolute_deadline == sandbox_request->absolute_deadline) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox *next_sandbox = sandbox_allocate(sandbox_request);
if (!next_sandbox) {
free(sandbox_request);
goto err;
}
assert(next_sandbox->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(next_sandbox, SANDBOX_INITIALIZED);
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
#else
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
}
#endif
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
}else if (next_module_idx == 1 && next_module_pre_count_flag > 1)
{
/*Before each id is put into the hash table, the key needs to add a "module handle"*/
struct module * next_module_node = next_module[0];
assert(next_module_node);
char *cur_request_id = NULL;
int key_len = snprintf(NULL, 0, "%s%lu", next_module_node->name, sandbox->id) + 1;
cur_request_id = (char *)malloc(key_len);
assert(cur_request_id);
snprintf(cur_request_id, key_len, "%s%lu", next_module_node->name, sandbox->id);
uint32_t ret_value_len;
uint64_t *requet_id = (uint64_t *)map_get(sandbox_request_id, cur_request_id, strlen(cur_request_id), &ret_value_len);
if (!requet_id) {
//it means that the first sandbox is calculated, and it needs to wait for the return value of other sandboxes.
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
};
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t enqueue_timestamp = __getcycles();
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
next_module_node->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, enqueue_timestamp,
sandbox->remaining_slack, true, pre_func_output, output_length);
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy-wait to generate an unique id, should we optimize it here?
*/
sandbox_request->id = sandbox->id;
uint32_t module_pre_count = next_module[0]->pre_module_count;
module_pre_count--;
assert(module_pre_count);
map_set(sandbox_request_id, cur_request_id, strlen(cur_request_id), &module_pre_count, sizeof(uint32_t), true);
map_set(sandbox_req_map, cur_request_id, strlen(cur_request_id), sandbox_request, sizeof(struct sandbox_request *), false);
}else
{
uint32_t rest_pre_count =*requet_id;
assert(rest_pre_count >= 1);
struct sandbox_request *sandbox_request = map_get(sandbox_req_map, cur_request_id, strlen(cur_request_id), &ret_value_len);
assert(sandbox_request);
// Copy data into pre_func_output
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char *pre_func_output = (char *)malloc(output_length);
if (!pre_func_output) {
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
goto err;
}
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
uint64_t enqueue_timestamp = __getcycles();
const char *previous_output = sandbox_request->previous_function_output ? sandbox_request->previous_function_output : "";
int new_output_length = strlen(previous_output) + output_length + 2;
char *new_output = (char *)malloc(new_output_length);
if (!new_output) {
fprintf(stderr, "Failed to allocate memory for the new output: %s\n", strerror(errno));
free(pre_func_output);
goto err;
}
snprintf(new_output, new_output_length, "%s+%s", previous_output, pre_func_output);
free(sandbox_request->previous_function_output);
sandbox_request->previous_function_output = new_output;
free(pre_func_output);
sandbox_request->output_length +=output_length;
rest_pre_count --;
if (rest_pre_count != 0)
{
map_upsert(sandbox_request_id, cur_request_id, strlen(cur_request_id), &rest_pre_count, sizeof(uint32_t));
}else
{
global_request_scheduler_add(sandbox_request);
map_delete(sandbox_req_map, cur_request_id, strlen(cur_request_id));
map_delete(sandbox_request_id, cur_request_id, strlen(cur_request_id));
}
}
if (sandbox->request_from_outside) {
sandbox_remove_from_epoll(sandbox);
}
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
}
else
{
error_message = "the strcuture of DAG is not supported\n";
goto err;
}
} else {
/* Retrieve the result, construct the HTTP response, and send to client */

@ -6,6 +6,54 @@
"relative-deadline-us": 50000,
"argsize": 1,
"pre_module_count": 0,
"next_modules": ["work2", "work3"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work2",
"path": "work_wasm.so",
"port": 10001,
"relative-deadline-us": 50000,
"argsize": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work3",
"path": "work_wasm.so",
"port": 10002,
"relative-deadline-us": 50000,
"argsize": 1,
"pre_module_count": 1,
"next_modules": ["work4"],
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1048776,
"http-resp-headers": [],
"http-resp-size": 1048776,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "work4",
"path": "work_wasm.so",
"port": 10003,
"relative-deadline-us": 50000,
"argsize": 1,
"pre_module_count": 2,
"next_modules": [],
"http-req-headers": [],
"http-req-content-type": "text/plain",

Binary file not shown.

@ -26,5 +26,6 @@ main(void)
else
write(1, d, r);
free(d);
return 0;
}

Binary file not shown.

@ -1,6 +1,5 @@
#include "../include/map.h"
#include <stdio.h>
#include <string.h>
typedef struct {
int id;
@ -27,10 +26,10 @@ int main() {
// 将 Employee 结构体存入哈希表
char *key1 = "employee1";
map_set(&myMap, key1, strlen(key1), alice, sizeof(Employee*));
map_set(&myMap, key1, strlen(key1), alice, sizeof(Employee*), false);
char *key2 = "employee2";
map_set(&myMap, key2, strlen(key2), bob, sizeof(Employee*));
map_set(&myMap, key2, strlen(key2), bob, sizeof(Employee*), true);
// 尝试从哈希表中检索 Employee
uint32_t ret_value_len;
@ -42,24 +41,8 @@ int main() {
printf("Employee not found.\n");
}
alice->id = 12;
char *key3 = "employee1";
strcat(alice->name, key3);
retrieved_employee = (Employee *)map_get(&myMap, key1, strlen(key1), &ret_value_len);
if (retrieved_employee) {
printf("Retrieved Employee: %s, ID: %d, Salary: %.2f\n",
retrieved_employee->name, retrieved_employee->id, retrieved_employee->salary);
} else {
printf("Employee not found.\n");
}
map_delete(&myMap, key1, strlen(key1));
retrieved_employee = (Employee *)map_get(&myMap, key1, strlen(key1), &ret_value_len);
if (retrieved_employee) {
printf("Retrieved Employee: %s, ID: %d, Salary: %.2f\n",
retrieved_employee->name, retrieved_employee->id, retrieved_employee->salary);
} else {
printf("Employee not found.\n");
}
map_set(&myMap, key1, strlen(key1), alice, sizeof(Employee*));
char *key = "employee1";
strcat(alice->name, key);
retrieved_employee = (Employee *)map_get(&myMap, key1, strlen(key1), &ret_value_len);
if (retrieved_employee) {
printf("Retrieved Employee: %s, ID: %d, Salary: %.2f\n",
@ -67,11 +50,21 @@ int main() {
} else {
printf("Employee not found.\n");
}
uint64_t value1 = 20, value2 = 30;
char *key3 = "test1";
char *key4 = "test2";
map_set(&myMap, key3, strlen(key3), &value1, sizeof(uint64_t), true);
uint32_t *value = (uint32_t *)map_get(&myMap, key3, strlen(key3), &ret_value_len);
if (value) printf("Retrieved value: %d\n", *value); else printf("Value not found.\n");
value1 ++;
value = (uint32_t *)map_get(&myMap, key3, strlen(key3), &ret_value_len);
if (value) printf("Retrieved value: %d\n", *value); else printf("Value not found.\n");
map_upsert(&myMap, key3, strlen(key3), &value1, sizeof(uint64_t));
value = (uint32_t *)map_get(&myMap, key3, strlen(key3), &ret_value_len);
if (value) printf("Retrieved value: %d\n", *value); else printf("Value not found.\n");
// 清理
free(alice);
free(bob);
// 也许还需要遍历哈希表并释放所有节点,这里假设只是一个简单的示例
return 0;
}

@ -20,5 +20,9 @@ C: 01, T: 0x7ffff7bfdd80, F: runtime_start_runtime_worker_threads>
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
sledgert: src/software_interrupt.c:181: void software_interrupt_handle_signals(int, siginfo_t *, void *): Assertion `TEST_RECORDING_BUFFER_LEN > software_interrupt_SIGALRM_kernel_count + software_interrupt_SIGALRM_thread_count' failed.
sledgert: src/software_interrupt.c:181: void software_interrupt_handle_signals(int, siginfo_t *, void *): Assertion `TEST_RECORDING_BUFFER_LEN > software_interrupt_SIGALRM_kernel_count + software_interrupt_SIGALRM_thread_count' failed.
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288
C: 01, T: 0x7ffff7bfdd80, F: module_new>
Stack Size: 524288

Loading…
Cancel
Save