|
|
|
@ -9,10 +9,9 @@
|
|
|
|
|
#include "module.h"
|
|
|
|
|
#include "software_interrupt.h"
|
|
|
|
|
#include "map.h"
|
|
|
|
|
#include "hashmap.h"
|
|
|
|
|
|
|
|
|
|
extern uint64_t system_start_timestamp;
|
|
|
|
|
pthread_mutex_t lock;
|
|
|
|
|
lock_t lock;
|
|
|
|
|
|
|
|
|
|
__thread struct sandbox *worker_thread_current_sandbox = NULL;
|
|
|
|
|
|
|
|
|
@ -156,7 +155,7 @@ current_sandbox_start(void)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
|
|
|
|
|
uint64_t enqueue_timestamp = __getcycles();
|
|
|
|
|
//printf("这是图一分多 the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
|
|
|
|
|
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
|
|
|
|
|
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
|
|
|
|
|
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
|
|
|
|
@ -165,12 +164,20 @@ current_sandbox_start(void)
|
|
|
|
|
{
|
|
|
|
|
struct module * next_module_node = next_module[i];
|
|
|
|
|
assert(next_module_node);
|
|
|
|
|
char * individual_pre_func_output = (char *)malloc(output_length);
|
|
|
|
|
if (!individual_pre_func_output) {
|
|
|
|
|
fprintf(stderr, "Failed to allocate memory for the individual previous output: %s\n", strerror(errno));
|
|
|
|
|
free(pre_func_output);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
memcpy(individual_pre_func_output, pre_func_output, output_length);
|
|
|
|
|
uint64_t enqueue_timestamp = __getcycles();
|
|
|
|
|
struct sandbox_request *sandbox_request =
|
|
|
|
|
sandbox_request_allocate(next_module_node, false, sandbox->request_length,
|
|
|
|
|
next_module_node->name, sandbox->client_socket_descriptor,
|
|
|
|
|
(const struct sockaddr *)&sandbox->client_address,
|
|
|
|
|
sandbox->request_arrival_timestamp, enqueue_timestamp,
|
|
|
|
|
sandbox->remaining_slack, true, pre_func_output, output_length);
|
|
|
|
|
sandbox->remaining_slack, true, individual_pre_func_output, output_length);
|
|
|
|
|
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
|
|
|
|
|
* will busy-wait to generate an unique id, should we optimize it here?
|
|
|
|
|
*/
|
|
|
|
@ -203,8 +210,13 @@ current_sandbox_start(void)
|
|
|
|
|
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
|
|
|
|
|
}else if (next_module_idx == 1 && next_module_pre_count > 1)
|
|
|
|
|
{
|
|
|
|
|
pthread_mutex_init(&lock, NULL);
|
|
|
|
|
pthread_mutex_lock(&lock);
|
|
|
|
|
static bool lock_flag = true;
|
|
|
|
|
if (lock_flag)
|
|
|
|
|
{
|
|
|
|
|
LOCK_INIT(&lock);
|
|
|
|
|
lock_flag = false;
|
|
|
|
|
}
|
|
|
|
|
bool hash_flag = false;
|
|
|
|
|
/*Before each id is put into the hash table, the key needs to add a "module handle"*/
|
|
|
|
|
struct module * next_module_node = next_module[0];
|
|
|
|
|
assert(next_module_node);
|
|
|
|
@ -214,9 +226,11 @@ current_sandbox_start(void)
|
|
|
|
|
assert(cur_request_id);
|
|
|
|
|
snprintf(cur_request_id, key_len, "%s%lu", next_module_node->name, sandbox->id);
|
|
|
|
|
uint32_t ret_value_len;
|
|
|
|
|
|
|
|
|
|
LOCK_LOCK(&lock);
|
|
|
|
|
//printf("锁住了\n");
|
|
|
|
|
uint64_t *requet_id = (uint64_t *)map_get(sandbox_request_id, cur_request_id, strlen(cur_request_id), &ret_value_len);
|
|
|
|
|
if (!requet_id) {
|
|
|
|
|
if(!requet_id) hash_flag = true;
|
|
|
|
|
if (hash_flag) {
|
|
|
|
|
//it means that the first sandbox is calculated, and it needs to wait for the return value of other sandboxes.
|
|
|
|
|
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
|
|
|
|
|
char * pre_func_output = (char *)malloc(output_length);
|
|
|
|
@ -225,6 +239,7 @@ current_sandbox_start(void)
|
|
|
|
|
goto err;
|
|
|
|
|
};
|
|
|
|
|
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
|
|
|
|
|
//printf("创建sandbox_request 模块工作结果 the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
|
|
|
|
|
uint64_t enqueue_timestamp = __getcycles();
|
|
|
|
|
//uint64_t current_rs = enqueue_timestamp - system_start_timestamp;
|
|
|
|
|
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
|
|
|
|
@ -247,7 +262,9 @@ current_sandbox_start(void)
|
|
|
|
|
map_set(sandbox_request_id, cur_request_id, strlen(cur_request_id), &module_pre_count, sizeof(uint32_t), true);
|
|
|
|
|
map_set(sandbox_req_map, cur_request_id, strlen(cur_request_id), sandbox_request, sizeof(struct sandbox_request *), false);
|
|
|
|
|
free(cur_request_id);
|
|
|
|
|
}else
|
|
|
|
|
cur_request_id = NULL;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
uint32_t rest_pre_count = *requet_id;
|
|
|
|
|
assert(rest_pre_count >= 1);
|
|
|
|
@ -257,25 +274,22 @@ current_sandbox_start(void)
|
|
|
|
|
|
|
|
|
|
// Copy data into pre_func_output
|
|
|
|
|
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
|
|
|
|
|
char *pre_func_outputi = (char *)malloc(output_length);
|
|
|
|
|
if (!pre_func_outputi) {
|
|
|
|
|
char *pre_func_output = (char *)malloc(output_length);
|
|
|
|
|
if (!pre_func_output) {
|
|
|
|
|
fprintf(stderr, "Failed to allocate memory for the previous output: %s\n", strerror(errno));
|
|
|
|
|
free(pre_func_output);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
memcpy(pre_func_outputi, sandbox->request_response_data + sandbox->request_length, output_length);
|
|
|
|
|
|
|
|
|
|
uint64_t enqueue_timestamp = __getcycles();
|
|
|
|
|
|
|
|
|
|
const char *previous_output = sandbox_request->previous_function_output ? sandbox_request->previous_function_output : "";
|
|
|
|
|
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
|
|
|
|
|
//printf("这是合并sandbox_request模块工作结果 the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
|
|
|
|
|
ssize_t new_output_length = sandbox_request->output_length + output_length + 2;
|
|
|
|
|
char *new_output = (char *)malloc(new_output_length);
|
|
|
|
|
if (!new_output) {
|
|
|
|
|
fprintf(stderr, "Failed to allocate memory for the new output: %s\n", strerror(errno));
|
|
|
|
|
free(pre_func_outputi);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
memset(new_output, 0, new_output_length);
|
|
|
|
|
snprintf(new_output, new_output_length, "%s&%s", previous_output, pre_func_outputi);
|
|
|
|
|
snprintf(new_output, new_output_length, "%s&%s", sandbox_request->previous_function_output, pre_func_output);
|
|
|
|
|
if(sandbox_request->previous_function_output != NULL)
|
|
|
|
|
{
|
|
|
|
|
free(sandbox_request->previous_function_output);
|
|
|
|
@ -283,11 +297,12 @@ current_sandbox_start(void)
|
|
|
|
|
}
|
|
|
|
|
assert(new_output);
|
|
|
|
|
sandbox_request->previous_function_output = new_output;
|
|
|
|
|
free(pre_func_outputi);
|
|
|
|
|
pre_func_outputi = NULL;
|
|
|
|
|
//printf("这是合并sandbox_request 模块工作结果 the ID %lu %s new_output is %s\n", sandbox->id, sandbox->module->name, new_output);
|
|
|
|
|
free(pre_func_output);
|
|
|
|
|
pre_func_output = NULL;
|
|
|
|
|
sandbox_request->output_length = new_output_length;
|
|
|
|
|
|
|
|
|
|
rest_pre_count --;
|
|
|
|
|
rest_pre_count--;
|
|
|
|
|
if (rest_pre_count != 0)
|
|
|
|
|
{
|
|
|
|
|
map_upsert(sandbox_request_id, cur_request_id, strlen(cur_request_id), &rest_pre_count, sizeof(uint32_t));
|
|
|
|
@ -300,17 +315,21 @@ current_sandbox_start(void)
|
|
|
|
|
map_delete(sandbox_request_id, cur_request_id, strlen(cur_request_id));
|
|
|
|
|
}
|
|
|
|
|
free(cur_request_id);
|
|
|
|
|
cur_request_id = NULL;
|
|
|
|
|
}
|
|
|
|
|
//printf("解锁");
|
|
|
|
|
LOCK_UNLOCK(&lock);
|
|
|
|
|
//printf("没有死索\n");
|
|
|
|
|
if (sandbox->request_from_outside) {
|
|
|
|
|
sandbox_remove_from_epoll(sandbox);
|
|
|
|
|
}
|
|
|
|
|
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
|
|
|
|
|
goto done;
|
|
|
|
|
}else
|
|
|
|
|
{
|
|
|
|
|
error_message = "the strcuture of DAG is not supported\n";
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock(&lock);
|
|
|
|
|
} else {
|
|
|
|
|
/* Retrieve the result, construct the HTTP response, and send to client */
|
|
|
|
|
if (sandbox_send_response(sandbox) < 0) {
|
|
|
|
|