support linear chain function calling by using list to chain modules

main
xiaosuGW 3 years ago
parent e6be55b1ed
commit 3bbc2f28fd

@ -78,6 +78,7 @@ struct module {
/* Entry Function to invoke serverless function */ /* Entry Function to invoke serverless function */
mod_main_fn_t main; mod_main_fn_t main;
struct module *next_module; /* the next module in the chain */
}; };
/************************* /*************************

@ -1,9 +0,0 @@
#pragma once
#include "module.h"
#include "ck_ht.h"
extern ck_ht_t g_module_ht;
extern void init_module_ht();
extern void insert_module_to_ht(const uint32_t port, const struct module* module);
extern struct module* get_module_from_ht(const uint32_t port);

@ -1,4 +0,0 @@
#pragma once
extern unsigned short g_single_function_flow_table[];
extern int g_chain_length;

@ -6,9 +6,7 @@
#include "sandbox_set_as_returned.h" #include "sandbox_set_as_returned.h"
#include "sandbox_setup_arguments.h" #include "sandbox_setup_arguments.h"
#include "scheduler.h" #include "scheduler.h"
#include "workflow.h"
#include "module.h" #include "module.h"
#include "module_manager.h"
#include "software_interrupt.h" #include "software_interrupt.h"
__thread struct sandbox *worker_thread_current_sandbox = NULL; __thread struct sandbox *worker_thread_current_sandbox = NULL;
@ -105,10 +103,11 @@ current_sandbox_start(void)
sandbox->completion_timestamp = __getcycles(); sandbox->completion_timestamp = __getcycles();
if (sandbox->current_func_index + 1 < g_chain_length) { struct module * next_module = sandbox->module->next_module;
uint32_t next_port = g_single_function_flow_table[sandbox->current_func_index + 1]; //if (sandbox->current_func_index + 1 < g_chain_length) {
struct module * next_module = get_module_from_ht(next_port); if (next_module != NULL) {
assert(next_module != NULL); //uint32_t next_port = g_single_function_flow_table[sandbox->current_func_index + 1];
//struct module * next_module = get_module_from_ht(next_port);
//generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue //generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length; ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;

@ -12,7 +12,6 @@
#include "likely.h" #include "likely.h"
#include "listener_thread.h" #include "listener_thread.h"
#include "module.h" #include "module.h"
#include "module_manager.h"
#include "module_database.h" #include "module_database.h"
#include "panic.h" #include "panic.h"
#include "runtime.h" #include "runtime.h"
@ -375,6 +374,7 @@ module_new_from_json(char *file_name)
int module_count = 0; int module_count = 0;
char *request_headers = NULL; char *request_headers = NULL;
char *reponse_headers = NULL; char *reponse_headers = NULL;
struct module *tail_module = NULL;
for (int i = 0; i < total_tokens; i++) { for (int i = 0; i < total_tokens; i++) {
assert(tokens[i].type == JSMN_OBJECT); assert(tokens[i].type == JSMN_OBJECT);
@ -556,12 +556,21 @@ module_new_from_json(char *file_name)
relative_deadline_us, port, request_size, response_size, relative_deadline_us, port, request_size, response_size,
admissions_percentile, expected_execution_us); admissions_percentile, expected_execution_us);
if (module == NULL) goto module_new_err; if (module == NULL) goto module_new_err;
assert(module); assert(module);
if (tail_module == NULL) {
tail_module = module;
tail_module->next_module = NULL;
} else {
tail_module->next_module = module;
tail_module = module;
tail_module->next_module = NULL;
}
module_set_http_info(module, request_count, request_headers, request_content_type, module_set_http_info(module, request_count, request_headers, request_content_type,
response_count, reponse_headers, response_content_type); response_count, reponse_headers, response_content_type);
module_count++; module_count++;
insert_module_to_ht((uint32_t)port, module);
} }
free(request_headers); free(request_headers);

@ -1,104 +0,0 @@
#include "module_manager.h"
ck_ht_t g_module_ht;
static void *
ht_malloc(size_t r)
{
return malloc(r);
}
static void
ht_free(void *p, size_t b, bool r)
{
(void)b;
(void)r;
free(p);
return;
}
static void
ht_hash_wrapper(struct ck_ht_hash *h,
const void *key,
size_t length,
uint64_t seed)
{
//h->value = (unsigned long)MurmurHash64A(key, length, seed);
return;
}
static struct ck_malloc my_allocator = {
.malloc = ht_malloc,
.free = ht_free
};
/**
* Module Manager Hashtable initilization Function
* Initilize the glaoble hashtable g_module_ht
* Returns true if initlization success, false otherwise
*/
void
init_module_ht()
{
if (ck_ht_init(&g_module_ht, CK_HT_MODE_DIRECT, NULL, &my_allocator, 2, 6602834) == false) {
perror("ck_ht_init");
exit(EXIT_FAILURE);
}
}
/**
* Module Manager Insert Function
* Insert a module with a key into a hashtable, the key is the port number.
* The value is the module object pointer
*
* @param port - the TCP port number that the module will listen to
* @param module - the module object pointer that will be inserted into the hashtable
*/
void
insert_module_to_ht(const uint32_t port, const struct module* module)
{
assert(module != NULL);
ck_ht_entry_t entry;
ck_ht_hash_t h;
ck_ht_hash_direct(&h, &g_module_ht, (uintptr_t)port);
ck_ht_entry_set_direct(&entry, h, (uintptr_t)port, (uintptr_t)module);
ck_ht_put_spmc(&g_module_ht, h, &entry);
printf("insert port %u, module=%p\n", port, module);
}
/**
* Module Manager Get Function
* Get a module from the hashtable with a key, the key is the port number.
* Returns the module object pointer
*
*/
struct module*
get_module_from_ht(const uint32_t port)
{
printf("try to get module with port %u\n", port);
ck_ht_entry_t entry;
ck_ht_hash_t h;
ck_ht_hash_direct(&h, &g_module_ht, (uintptr_t)port);
ck_ht_entry_key_set_direct(&entry, (uintptr_t)port);
if (ck_ht_get_spmc(&g_module_ht, h, &entry) == false) { //
printf("ERROR: Found non-existing entry with port %u.\n", port);
return NULL;
}
uintptr_t k, v;
k = ck_ht_entry_key_direct(&entry);// get key from entry
v = ck_ht_entry_value_direct(&entry);//get value from entry
if (unlikely(k != port)) {
printf("ERROR: key doesnt match with port %u.\n", port);
return NULL;
} else {
printf("module found, address is =%p\n", v);
return (struct module*) v;
}
}

@ -19,7 +19,6 @@
#include "http_parser_settings.h" #include "http_parser_settings.h"
#include "listener_thread.h" #include "listener_thread.h"
#include "module.h" #include "module.h"
#include "module_manager.h"
#include "runtime.h" #include "runtime.h"
#include "sandbox_request.h" #include "sandbox_request.h"
#include "scheduler.h" #include "scheduler.h"
@ -99,8 +98,6 @@ runtime_initialize(void)
/* Setup Scheduler */ /* Setup Scheduler */
scheduler_initialize(); //set function pointers to global_request_scheduler, both EDF and FIFO have a set of functions scheduler_initialize(); //set function pointers to global_request_scheduler, both EDF and FIFO have a set of functions
/* Init global module hash table */
init_module_ht();
/* Configure Signals */ /* Configure Signals */
signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
signal(SIGTERM, runtime_cleanup); // call runtime_cleanup when get SIGTERM signal signal(SIGTERM, runtime_cleanup); // call runtime_cleanup when get SIGTERM signal

@ -1,6 +0,0 @@
#include "workflow.h"
unsigned short g_single_function_flow_table[] = {10000, 10001};
int g_chain_length = 2;
int g_execution_index = 0;
Loading…
Cancel
Save