support chain function calling by introducing hashtable for retrieving module struct

main
xiaosuGW 3 years ago
parent 8b9279c640
commit e6be55b1ed

@ -10,10 +10,10 @@ PAGE_SIZE := $(shell getconf PAGESIZE)
# Compiler Settings
CC=clang
CC_OPTIONS = -O3 -flto -g -pthread -D_GNU_SOURCE
#CC_OPTIONS = -O3 -flto -g -pthread -D_GNU_SOURCE
# CC_OPTIONS for Debugging
# CC_OPTIONS = -O0 -g -pthread -D_GNU_SOURCE
CC_OPTIONS = -O0 -g -pthread -D_GNU_SOURCE
# CFI Sanitizer
# CC_OPTIONS = -O0 -g -pthread -D_GNU_SOURCE -flto -fvisibility=default -fsanitize=cfi
@ -91,6 +91,7 @@ CFLAGS += -DUSE_MEM_VM
LDFLAGS += -Wl,--export-dynamic -ldl -lm
LDFLAGS += -Lthirdparty/dist/lib/
INCLUDES += -Iinclude/ -Ithirdparty/dist/include/
INCLUDES += -Iinclude/ -Ithirdparty/ck/include/
# CFILES
CFILES += src/*.c
@ -99,6 +100,7 @@ CFILES += src/libc/*.c
CFILES += src/memory/common.c
CFILES += src/memory/64bit_nix.c
CFILES += thirdparty/dist/lib/http_parser.o
CFILES += thirdparty/dist/lib/libck.a
# Configuring Jasmine
JSMNCFLAGS += -DJSMN_STATIC

@ -41,7 +41,7 @@ arch_context_init(struct arch_context *actx, reg_t ip, reg_t sp)
* which defaults to resuming execution of main
*/
static inline int
arch_context_switch(struct arch_context *a, struct arch_context *b)
arch_context_switch(struct arch_context *a, struct arch_context *b) //save context to a and switch to context b
{
#ifndef NDEBUG
/*
@ -61,7 +61,7 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
assert(a != b);
/* Set any NULLs to worker_thread_base_context to resume execution of main */
if (a == NULL) a = &worker_thread_base_context;
if (a == NULL) a = &worker_thread_base_context; //NULL indicates the main context of the worker thread before running any sandboxs
if (b == NULL) b = &worker_thread_base_context;
/* A Transition {Unused, Running} -> Fast */

@ -80,6 +80,8 @@ arch_context_restore_new(mcontext_t *active_context, struct arch_context *sandbo
*
* NULL in either of these values indicates the "no sandbox to execute" state,
* which defaults to resuming execution of main
* save the current context to a, and switch to b, if b is fast path, then only restore b's IP
* and SP; if b is slow path, then send a SIGUSER1 to restore a full context of b
*/
static inline int
arch_context_switch(struct arch_context *a, struct arch_context *b)

@ -0,0 +1,9 @@
#pragma once
#include "module.h"
#include "ck_ht.h"
extern ck_ht_t g_module_ht;
extern void init_module_ht();
extern void insert_module_to_ht(const uint32_t port, const struct module* module);
extern struct module* get_module_from_ht(const uint32_t port);

@ -28,6 +28,16 @@ sandbox_close_http(struct sandbox *sandbox)
client_socket_close(sandbox->client_socket_descriptor, &sandbox->client_address);
}
static inline void
sandbox_remove_from_epoll(struct sandbox *sandbox)
{
assert(sandbox != NULL);
int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, sandbox->client_socket_descriptor, NULL);
if (unlikely(rc < 0)) panic_err();
}
/**
* Initializes a sandbox fd ready for use with the proper preopen magic
* @param sandbox
@ -133,7 +143,7 @@ sandbox_open_http(struct sandbox *sandbox)
http_parser_init(&sandbox->http_parser, HTTP_REQUEST);
/* Set the sandbox as the data the http-parser has access to */
sandbox->http_parser.data = sandbox;
sandbox->http_parser.data = sandbox; //assign data to sandbox in case to operator it when a callback happended
/* Freshly allocated sandbox going runnable for first time, so register client socket with epoll */
struct epoll_event accept_evt;

@ -15,13 +15,17 @@
struct sandbox_request {
uint64_t id;
bool request_from_outside; /* true is yes, false is no */
int current_func_index;
struct module * module;
char * arguments;
int socket_descriptor;
struct sockaddr socket_address;
uint64_t request_arrival_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */
char * previous_function_output;
ssize_t output_length;
ssize_t pre_request_length; /* previous request length */
/*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles)
@ -65,9 +69,10 @@ sandbox_request_log_allocation(struct sandbox_request *sandbox_request)
* @return the new sandbox request
*/
static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, char *arguments, int socket_descriptor,
sandbox_request_allocate(struct module *module, bool request_from_outside, ssize_t request_length, int current_func_index,
char *arguments, int socket_descriptor,
const struct sockaddr *socket_address, uint64_t request_arrival_timestamp,
uint64_t admissions_estimate)
uint64_t admissions_estimate, char *previous_function_output, ssize_t output_length)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
assert(sandbox_request);
@ -76,11 +81,16 @@ sandbox_request_allocate(struct module *module, char *arguments, int socket_desc
sandbox_request->id = sandbox_request_count_postfix_increment();
sandbox_request->module = module;
sandbox_request->request_from_outside = request_from_outside;
sandbox_request->current_func_index = current_func_index;
sandbox_request->arguments = arguments;
sandbox_request->socket_descriptor = socket_descriptor;
memcpy(&sandbox_request->socket_address, socket_address, sizeof(struct sockaddr));
sandbox_request->request_arrival_timestamp = request_arrival_timestamp;
sandbox_request->absolute_deadline = request_arrival_timestamp + module->relative_deadline;
sandbox_request->previous_function_output = previous_function_output;
sandbox_request->output_length = output_length;
sandbox_request->pre_request_length = request_length;
/*
* Admissions Control State

@ -33,6 +33,11 @@ sandbox_set_as_initialized(struct sandbox *sandbox, struct sandbox_request *sand
sandbox->allocation_timestamp = allocation_timestamp;
sandbox->state = SANDBOX_SET_AS_INITIALIZED;
sandbox->current_func_index = sandbox_request->current_func_index;
sandbox->request_from_outside = sandbox_request->request_from_outside;
sandbox->previous_function_output = sandbox_request->previous_function_output;
sandbox->output_length = sandbox_request->output_length;
sandbox->pre_request_length = sandbox_request->pre_request_length;
/* Initialize the sandbox's context, stack, and instruction pointer */
/* stack_start points to the bottom of the usable stack, so add stack_size to get to top */
arch_context_init(&sandbox->ctxt, (reg_t)current_sandbox_start,

@ -31,6 +31,11 @@ struct sandbox_io_handle {
struct sandbox {
uint64_t id;
bool request_from_outside;
int current_func_index; /* indicate the index of next function in the chain */
char * previous_function_output; /* the output of the previous function */
ssize_t output_length; /* the length of previous_function_output */
ssize_t pre_request_length; /* the length of previous_function_output */
sandbox_state_t state;
uint32_t sandbox_size; /* The struct plus enough buffer to hold the request or response (sized off largest) */

@ -0,0 +1,4 @@
#pragma once
extern unsigned short g_single_function_flow_table[];
extern int g_chain_length;

@ -6,6 +6,9 @@
#include "sandbox_set_as_returned.h"
#include "sandbox_setup_arguments.h"
#include "scheduler.h"
#include "workflow.h"
#include "module.h"
#include "module_manager.h"
#include "software_interrupt.h"
__thread struct sandbox *worker_thread_current_sandbox = NULL;
@ -68,19 +71,32 @@ current_sandbox_start(void)
sandbox_initialize_stdio(sandbox);
sandbox_open_http(sandbox);
if (sandbox_receive_request(sandbox) < 0) {
error_message = "Unable to receive or parse client request\n";
goto err;
};
sandbox_open_http(sandbox);//add IN/OUT event to epoll
if (sandbox->request_from_outside) {
if (sandbox_receive_request(sandbox) < 0) {//read data from client socket to get http request,
//the result populates http_request structure
// if read blocked, then remove the sandbox from the
// local runqueue and pause the sandbox
error_message = "Unable to receive or parse client request\n";
goto err;
};
} else {
// copy previous output to sandbox->request_response_data, as the input for the sandbox.
// let sandbox->http_request->body points to sandbox->request_response_data
assert(sandbox->previous_function_output != NULL);
memcpy(sandbox->request_response_data, sandbox->previous_function_output, sandbox->output_length);
sandbox->http_request.body = sandbox->request_response_data;
sandbox->http_request.body_length = sandbox->output_length;
sandbox->request_length = sandbox->pre_request_length;
sandbox->request_response_data_length = sandbox->request_length;
}
/* Initialize sandbox memory */
struct module *current_module = sandbox_get_module(sandbox);
module_initialize_globals(current_module);
module_initialize_memory(current_module);
sandbox_setup_arguments(sandbox);
sandbox_setup_arguments(sandbox); //this arguments is not the http body content
/* Executing the function */
int32_t argument_count = module_get_argument_count(current_module);
current_sandbox_enable_preemption(sandbox);
@ -88,24 +104,49 @@ current_sandbox_start(void)
current_sandbox_disable_preemption(sandbox);
sandbox->completion_timestamp = __getcycles();
/* Retrieve the result, construct the HTTP response, and send to client */
if (sandbox_send_response(sandbox) < 0) {
error_message = "Unable to build and send client response\n";
goto err;
};
http_total_increment_2xx();
sandbox->response_timestamp = __getcycles();
assert(sandbox->state == SANDBOX_RUNNING);
sandbox_close_http(sandbox);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
if (sandbox->current_func_index + 1 < g_chain_length) {
uint32_t next_port = g_single_function_flow_table[sandbox->current_func_index + 1];
struct module * next_module = get_module_from_ht(next_port);
assert(next_module != NULL);
//generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue
ssize_t output_length = sandbox->request_response_data_length - sandbox->request_length;
char * pre_func_output = (char *) malloc(output_length);
memcpy(pre_func_output, sandbox->request_response_data + sandbox->request_length, output_length);
struct sandbox_request *sandbox_request =
sandbox_request_allocate(next_module, false, sandbox->request_length, sandbox->current_func_index + 1,
next_module->name, sandbox->client_socket_descriptor,
(const struct sockaddr *)&sandbox->client_address,
sandbox->request_arrival_timestamp, true, pre_func_output, output_length);
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);
sandbox_remove_from_epoll(sandbox);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING);
scheduler_yield();
assert(0);
return;
} else {
/* Retrieve the result, construct the HTTP response, and send to client */
if (sandbox_send_response(sandbox) < 0) { // if send blocked, remove the sandbox from the local runqueue
// and pause the sandbox
error_message = "Unable to build and send client response\n";
goto err;
};
http_total_increment_2xx();
sandbox->response_timestamp = __getcycles();
assert(sandbox->state == SANDBOX_RUNNING);
sandbox_close_http(sandbox);
sandbox_set_as_returned(sandbox, SANDBOX_RUNNING); //request is completed, remove the sandbox from the
//local runqueue
}
done:
/* Cleanup connection and exit sandbox */
generic_thread_dump_lock_overhead();
scheduler_yield();
scheduler_yield(); //put the sandbox to the complete queue
/* This assert prevents a segfault discussed in
* https://github.com/phanikishoreg/awsm-Serverless-Framework/issues/66

@ -19,7 +19,7 @@ global_request_scheduler_minheap_add(void *sandbox_request)
{
assert(sandbox_request);
assert(global_request_scheduler_minheap);
if (unlikely(!listener_thread_is_running())) panic("%s is only callable by the listener thread\n", __func__);
//if (unlikely(!listener_thread_is_running())) panic("%s is only callable by the listener thread\n", __func__);
int return_code = priority_queue_enqueue(global_request_scheduler_minheap, sandbox_request);
/* TODO: Propagate -1 to caller. Issue #91 */
@ -75,7 +75,7 @@ sandbox_request_get_priority_fn(void *element)
*/
void
global_request_scheduler_minheap_initialize()
{
{ /* second parameter is set to true, means we will use lock for this queue since it is a global queue shared by all worker threads */
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_request_get_priority_fn);
struct global_request_scheduler_config config = {

@ -177,9 +177,9 @@ listener_thread_main(void *dummy)
/* Allocate a Sandbox Request */
struct sandbox_request *sandbox_request =
sandbox_request_allocate(module, module->name, client_socket,
sandbox_request_allocate(module, true, 0, 0, module->name, client_socket,
(const struct sockaddr *)&client_address,
request_arrival_timestamp, work_admitted);
request_arrival_timestamp, work_admitted, NULL, 0);
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add(sandbox_request);

@ -12,6 +12,7 @@
#include "likely.h"
#include "listener_thread.h"
#include "module.h"
#include "module_manager.h"
#include "module_database.h"
#include "panic.h"
#include "runtime.h"
@ -560,6 +561,7 @@ module_new_from_json(char *file_name)
module_set_http_info(module, request_count, request_headers, request_content_type,
response_count, reponse_headers, response_content_type);
module_count++;
insert_module_to_ht((uint32_t)port, module);
}
free(request_headers);

@ -0,0 +1,104 @@
#include "module_manager.h"
ck_ht_t g_module_ht;
static void *
ht_malloc(size_t r)
{
return malloc(r);
}
static void
ht_free(void *p, size_t b, bool r)
{
(void)b;
(void)r;
free(p);
return;
}
static void
ht_hash_wrapper(struct ck_ht_hash *h,
const void *key,
size_t length,
uint64_t seed)
{
//h->value = (unsigned long)MurmurHash64A(key, length, seed);
return;
}
static struct ck_malloc my_allocator = {
.malloc = ht_malloc,
.free = ht_free
};
/**
* Module Manager Hashtable initilization Function
* Initilize the glaoble hashtable g_module_ht
* Returns true if initlization success, false otherwise
*/
void
init_module_ht()
{
if (ck_ht_init(&g_module_ht, CK_HT_MODE_DIRECT, NULL, &my_allocator, 2, 6602834) == false) {
perror("ck_ht_init");
exit(EXIT_FAILURE);
}
}
/**
* Module Manager Insert Function
* Insert a module with a key into a hashtable, the key is the port number.
* The value is the module object pointer
*
* @param port - the TCP port number that the module will listen to
* @param module - the module object pointer that will be inserted into the hashtable
*/
void
insert_module_to_ht(const uint32_t port, const struct module* module)
{
assert(module != NULL);
ck_ht_entry_t entry;
ck_ht_hash_t h;
ck_ht_hash_direct(&h, &g_module_ht, (uintptr_t)port);
ck_ht_entry_set_direct(&entry, h, (uintptr_t)port, (uintptr_t)module);
ck_ht_put_spmc(&g_module_ht, h, &entry);
printf("insert port %u, module=%p\n", port, module);
}
/**
* Module Manager Get Function
* Get a module from the hashtable with a key, the key is the port number.
* Returns the module object pointer
*
*/
struct module*
get_module_from_ht(const uint32_t port)
{
printf("try to get module with port %u\n", port);
ck_ht_entry_t entry;
ck_ht_hash_t h;
ck_ht_hash_direct(&h, &g_module_ht, (uintptr_t)port);
ck_ht_entry_key_set_direct(&entry, (uintptr_t)port);
if (ck_ht_get_spmc(&g_module_ht, h, &entry) == false) { //
printf("ERROR: Found non-existing entry with port %u.\n", port);
return NULL;
}
uintptr_t k, v;
k = ck_ht_entry_key_direct(&entry);// get key from entry
v = ck_ht_entry_value_direct(&entry);//get value from entry
if (unlikely(k != port)) {
printf("ERROR: key doesnt match with port %u.\n", port);
return NULL;
} else {
printf("module found, address is =%p\n", v);
return (struct module*) v;
}
}

@ -19,6 +19,7 @@
#include "http_parser_settings.h"
#include "listener_thread.h"
#include "module.h"
#include "module_manager.h"
#include "runtime.h"
#include "sandbox_request.h"
#include "scheduler.h"
@ -65,12 +66,12 @@ runtime_set_resource_limits_to_max()
int resource = resources[i];
if (getrlimit(resource, &limit) < 0) panic_err();
if (limit.rlim_cur == RLIM_INFINITY) {
if (limit.rlim_cur == RLIM_INFINITY) { // rlim_cur is soft limit
strncpy(lim, "Infinite", uint64_t_max_digits);
} else {
snprintf(lim, uint64_t_max_digits, "%lu", limit.rlim_cur);
}
if (limit.rlim_max == RLIM_INFINITY) {
if (limit.rlim_max == RLIM_INFINITY) { // rlim_max is hard limit
strncpy(max, "Infinite", uint64_t_max_digits);
} else {
snprintf(max, uint64_t_max_digits, "%lu", limit.rlim_max);
@ -91,18 +92,21 @@ runtime_set_resource_limits_to_max()
void
runtime_initialize(void)
{
http_total_init();
sandbox_request_count_initialize();
sandbox_count_initialize();
http_total_init(); //initilize http requests/error response counter
sandbox_request_count_initialize(); //initilize sandbox requests counter
sandbox_count_initialize(); //initilize sandbox state counter
/* Setup Scheduler */
scheduler_initialize();
scheduler_initialize(); //set function pointers to global_request_scheduler, both EDF and FIFO have a set of functions
/* Init global module hash table */
init_module_ht();
/* Configure Signals */
signal(SIGPIPE, SIG_IGN);
signal(SIGTERM, runtime_cleanup);
signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE
signal(SIGTERM, runtime_cleanup); // call runtime_cleanup when get SIGTERM signal
http_parser_settings_initialize();
http_parser_settings_initialize(); // set function pointers for http parser lib, when get a http message, some callback
// functions will be called
admissions_control_initialize();
}

@ -186,9 +186,12 @@ sandbox_free(struct sandbox *sandbox)
assert(sandbox->state == SANDBOX_ERROR || sandbox->state == SANDBOX_COMPLETE);
int rc;
if (sandbox->previous_function_output != NULL) {
free(sandbox->previous_function_output);
sandbox->previous_function_output = NULL;
}
module_release(sandbox->module);
/* Free Sandbox Stack */
errno = 0;

@ -0,0 +1,6 @@
#include "workflow.h"
unsigned short g_single_function_flow_table[] = {10000, 10001};
int g_chain_length = 2;
int g_execution_index = 0;

@ -3,8 +3,8 @@ ARCH := $(shell uname -m)
CC=clang # Source -> Native
WASMCC=wasm32-unknown-unknown-wasm-clang # Source -> WebAssembly
OPTFLAGS=-O3 -flto
# OPTFLAGS=-O0 -flto -g
# OPTFLAGS=-O3 -flto
OPTFLAGS=-O0 -flto -g
MEMC_64=64bit_nix.c
# MEMC_NO=no_protection.c
# MEMC_GEN=generic.c

@ -0,0 +1,19 @@
#!/bin/bash
# Executes the runtime in GDB
# Substitutes the absolute path from the container with a path relatively derived from the location of this script
# This allows debugging outside of the Docker container
# Also disables pagination and stopping on SIGUSR1
declare project_path="$(
cd "$(dirname "$1")/../.."
pwd
)"
echo $project_path
cd ../../bin
export LD_LIBRARY_PATH="$(pwd):$LD_LIBRARY_PATH"
gdb --eval-command="handle SIGUSR1 nostop" \
--eval-command="set pagination off" \
--eval-command="set substitute-path /sledge/runtime $project_path" \
--eval-command="run ../tests/my_fibonacci.json" \
./sledgert
cd ../../tests

@ -0,0 +1,30 @@
{
"active": true,
"name": "fibonacci",
"path": "fibonacci_wasm.so",
"port": 10000,
"expected-execution-us": 600,
"relative-deadline-us": 2000,
"argsize": 2,
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
},
{
"active": true,
"name": "fibonacci",
"path": "fibonacci_wasm.so",
"port": 10001,
"expected-execution-us": 600,
"relative-deadline-us": 2000,
"argsize": 2,
"http-req-headers": [],
"http-req-content-type": "text/plain",
"http-req-size": 1024,
"http-resp-headers": [],
"http-resp-size": 1024,
"http-resp-content-type": "text/plain"
}
Loading…
Cancel
Save