# include "current_sandbox.h"
# include "sandbox_functions.h"
# include "sandbox_receive_request.h"
# include "sandbox_send_response.h"
# include "sandbox_set_as_error.h"
# include "sandbox_set_as_returned.h"
# include "sandbox_setup_arguments.h"
# include "scheduler.h"
# include "module.h"
# include "software_interrupt.h"
# include "map.h"
extern uint64_t system_start_timestamp ;
lock_t lock ;
# define OUTPUT_BUFER_SIZE 1024*5
__thread struct sandbox * worker_thread_current_sandbox = NULL ;
__thread struct sandbox_context_cache local_sandbox_context_cache = {
. linear_memory_start = NULL ,
. linear_memory_size = 0 ,
. module_indirect_table = NULL ,
} ;
static inline void
current_sandbox_enable_preemption ( struct sandbox * sandbox )
{
# ifdef LOG_PREEMPTION
debuglog ( " Sandbox %lu - enabling preemption - Missed %d SIGALRM \n " , sandbox - > id ,
software_interrupt_deferred_sigalrm ) ;
fflush ( stderr ) ;
# endif
if ( __sync_bool_compare_and_swap ( & sandbox - > ctxt . preemptable , 0 , 1 ) = = false ) {
panic ( " Recursive call to current_sandbox_enable_preemption \n " ) ;
}
if ( software_interrupt_deferred_sigalrm > 0 ) {
/* Update Max */
if ( software_interrupt_deferred_sigalrm > software_interrupt_deferred_sigalrm_max [ worker_thread_idx ] ) {
software_interrupt_deferred_sigalrm_max [ worker_thread_idx ] =
software_interrupt_deferred_sigalrm ;
}
software_interrupt_deferred_sigalrm = 0 ;
// TODO: Replay. Does the replay need to be before or after enabling preemption?
}
}
static inline void
current_sandbox_disable_preemption ( struct sandbox * sandbox )
{
# ifdef LOG_PREEMPTION
debuglog ( " Sandbox %lu - disabling preemption \n " , sandbox - > id ) ;
fflush ( stderr ) ;
# endif
if ( __sync_bool_compare_and_swap ( & sandbox - > ctxt . preemptable , 1 , 0 ) = = false ) {
panic ( " Recursive call to current_sandbox_disable_preemption \n " ) ;
}
}
static inline void
current_sandbox_get_newlaxity ( struct sandbox * sandbox , uint64_t now )
{
assert ( sandbox ) ;
sandbox - > remaining_slack - = ( now - sandbox - > last_update_timestamp ) ;
}
/**
* Sandbox execution logic
* Handles setup , request parsing , WebAssembly initialization , function execution , response building and
* sending , and cleanup
*/
void
current_sandbox_start ( void )
{
struct sandbox * sandbox = current_sandbox_get ( ) ;
assert ( sandbox ! = NULL ) ;
assert ( sandbox - > state = = SANDBOX_RUNNING ) ;
char * error_message = " " ;
sandbox_initialize_stdio ( sandbox ) ;
int next_module_idx = sandbox - > module - > next_module_count ;
static struct hashmap * sandbox_req_map = NULL ;
static struct hashmap * sandbox_request_id = NULL ;
if ( sandbox_req_map = = NULL | | sandbox_request_id = = NULL ) {
if ( sandbox_req_map = = NULL )
{
sandbox_req_map = malloc ( sizeof ( struct hashmap ) ) ;
map_init ( sandbox_req_map ) ;
}
if ( sandbox_request_id = = NULL )
{
sandbox_request_id = malloc ( sizeof ( struct hashmap ) ) ;
map_init ( sandbox_request_id ) ;
}
assert ( sandbox_req_map ! = NULL ) ;
assert ( sandbox_request_id ! = NULL ) ;
}
struct module * * next_module = sandbox - > module - > next_module ;
/*
* Add the client fd to epoll if it is the first or last sandbox in the chain because they
* need to read and write from / to this fd
*/
if ( sandbox - > request_from_outside | | next_module = = NULL ) {
sandbox_open_http ( sandbox ) ;
}
if ( sandbox - > request_from_outside ) {
if ( sandbox_receive_request ( sandbox ) < 0 ) {
error_message = " Unable to receive or parse client request \n " ;
goto err ;
}
} else {
/*
* Copy previous output to sandbox - > request_response_data , as the input for the current sandbox .
* Let sandbox - > http_request - > body points to sandbox - > request_response_data
*/
assert ( sandbox - > previous_function_output ! = NULL ) ;
memcpy ( sandbox - > request_response_data , sandbox - > previous_function_output , sandbox - > output_length ) ;
sandbox - > http_request . body = sandbox - > request_response_data ;
sandbox - > http_request . body_length = sandbox - > output_length ;
sandbox - > request_length = sandbox - > previous_request_length ;
sandbox - > request_response_data_length = sandbox - > request_length ;
}
/* Initialize sandbox memory */
struct module * current_module = sandbox_get_module ( sandbox ) ;
module_initialize_globals ( current_module ) ;
module_initialize_memory ( current_module ) ;
sandbox_setup_arguments ( sandbox ) ;
/* Executing the function */
int32_t argument_count = module_get_argument_count ( current_module ) ;
current_sandbox_enable_preemption ( sandbox ) ;
sandbox - > return_value = module_main ( current_module , argument_count , sandbox - > arguments_offset ) ;
current_sandbox_disable_preemption ( sandbox ) ;
sandbox - > completion_timestamp = __getcycles ( ) ;
/* Function code execution failed, terminate the request */
if ( sandbox - > return_value < 0 ) {
/* TODO: Simply goto err is not perfect because not print out the response meesage of the function code.
* Should return 400 and the err message in the http response body .
*/
goto err ;
} else if ( next_module ! = NULL ) {
assert ( next_module_idx ) ;
assert ( next_module ) ;
size_t next_module_pre_count = next_module [ 0 ] - > pre_module_count ;
assert ( next_module_pre_count ) ;
if ( next_module_idx > 1 | | ( next_module_idx = = 1 & & next_module_pre_count = = 1 ) )
{
/* Generate a new request, copy the current sandbox's output to the next request's buffer, and put it to the global queue */
ssize_t output_length = sandbox - > request_response_data_length - sandbox - > request_length ;
char * pre_func_output = ( char * ) malloc ( output_length ) ;
if ( ! pre_func_output ) {
fprintf ( stderr , " Failed to allocate memory for the previous output: %s \n " , strerror ( errno ) ) ;
goto err ;
} ;
memcpy ( pre_func_output , sandbox - > request_response_data + sandbox - > request_length , output_length ) ;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
for ( size_t i = 0 ; i < next_module_idx ; i + + )
{
struct module * next_module_node = next_module [ i ] ;
assert ( next_module_node ) ;
char * individual_pre_func_output = ( char * ) malloc ( output_length ) ;
if ( ! individual_pre_func_output ) {
fprintf ( stderr , " Failed to allocate memory for the individual previous output: %s \n " , strerror ( errno ) ) ;
free ( pre_func_output ) ;
goto err ;
}
memcpy ( individual_pre_func_output , pre_func_output , output_length ) ;
uint64_t now = __getcycles ( ) ;
current_sandbox_get_newlaxity ( sandbox , now ) ;
struct sandbox_request * sandbox_request =
sandbox_request_allocate ( next_module_node , false , sandbox - > request_length ,
next_module_node - > name , sandbox - > client_socket_descriptor ,
( const struct sockaddr * ) & sandbox - > client_address ,
sandbox - > request_arrival_timestamp , now ,
sandbox - > remaining_slack , sandbox - > laxity , true , NULL , 0 ) ;
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy - wait to generate an unique id , should we optimize it here ?
*/
sandbox_request - > id = sandbox - > id ;
pre_functions_output_request_add ( sandbox_request , individual_pre_func_output , output_length , sandbox - > module - > run_priority ) ;
# ifdef OPT_AVOID_GLOBAL_QUEUE
/* TODO: The running time of the current sandbox contains the next sandbox's initialization time, does it matter? */
if ( sandbox - > absolute_deadline = = sandbox_request - > absolute_deadline ) {
/* Put the next sandbox to the local run queue to reduce the overhead of the global queue */
struct sandbox * next_sandbox = sandbox_allocate ( sandbox_request ) ;
if ( ! next_sandbox ) {
free ( sandbox_request ) ;
goto err ;
}
assert ( next_sandbox - > state = = SANDBOX_INITIALIZED ) ;
sandbox_set_as_runnable ( next_sandbox , SANDBOX_INITIALIZED ) ;
} else {
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add ( sandbox_request ) ;
}
# else
/* Add to the Global Sandbox Request Scheduler */
global_request_scheduler_add ( sandbox_request ) ;
}
# endif
/* Remove the client fd from epoll if it is the first sandbox in the chain */
if ( sandbox - > request_from_outside ) {
sandbox_remove_from_epoll ( sandbox ) ;
}
/*free memory of pre_func_out, Because it has been deeply copied its copy into requestbecause */
free ( pre_func_output ) ;
pre_func_output = NULL ;
sandbox_set_as_returned ( sandbox , SANDBOX_RUNNING ) ;
} else if ( next_module_idx = = 1 & & next_module_pre_count > 1 )
{
static bool lock_flag = true ;
if ( lock_flag )
{
LOCK_INIT ( & lock ) ;
lock_flag = false ;
}
/*Before each id is put into the hash table, the key needs to add a "module handle"*/
struct module * next_module_node = next_module [ 0 ] ;
assert ( next_module_node ) ;
char * cur_request_id = NULL ;
int key_len = snprintf ( NULL , 0 , " %s%lu " , next_module_node - > name , sandbox - > id ) + 1 ;
cur_request_id = ( char * ) malloc ( key_len ) ;
assert ( cur_request_id ) ;
snprintf ( cur_request_id , key_len , " %s%lu " , next_module_node - > name , sandbox - > id ) ;
uint32_t ret_value_len ;
uint32_t rest_pre_count = 888 ;
/*calculation the pre_function_out*/
ssize_t output_length = sandbox - > request_response_data_length - sandbox - > request_length ;
char * pre_func_output = ( char * ) malloc ( output_length ) ;
if ( ! pre_func_output ) {
fprintf ( stderr , " Failed to allocate memory for the previous output: %s \n " , strerror ( errno ) ) ;
goto err ;
} ;
memcpy ( pre_func_output , sandbox - > request_response_data + sandbox - > request_length , output_length ) ;
//debuglog("the ID %lu %s pre_func_output is %s\n", sandbox->id, sandbox->module->name, pre_func_output);
LOCK_LOCK ( & lock ) ;
uint64_t * request_id = ( uint64_t * ) map_get ( sandbox_request_id , cur_request_id , strlen ( cur_request_id ) , & ret_value_len ) ;
bool mapflag = false ;
if ( ! request_id ) {
uint64_t now = __getcycles ( ) ;
current_sandbox_get_newlaxity ( sandbox , now ) ;
//mem_log("time %lu request id:%d executing, name:%s remaining slack %lu\n", current_rs,
// sandbox->id, sandbox->module->name, sandbox->remaining_slack);
struct sandbox_request * sandbox_request =
sandbox_request_allocate ( next_module_node , false , sandbox - > request_length ,
next_module_node - > name , sandbox - > client_socket_descriptor ,
( const struct sockaddr * ) & sandbox - > client_address ,
sandbox - > request_arrival_timestamp , now ,
sandbox - > remaining_slack , sandbox - > laxity , true , NULL , 0 ) ;
/* TODO: All sandboxs in the chain share the same request id, but sandbox_request_allocate()
* will busy - wait to generate an unique id , should we optimize it here ?
*/
sandbox_request - > id = sandbox - > id ;
uint32_t module_pre_count = next_module_pre_count ;
module_pre_count - - ;
assert ( module_pre_count ) ;
if ( ! map_set ( sandbox_request_id , cur_request_id , strlen ( cur_request_id ) , & module_pre_count , sizeof ( uint32_t ) , true ) ) panic ( " the map of sandbox_request_id is NULL \n " ) ;
if ( ! map_set ( sandbox_req_map , cur_request_id , strlen ( cur_request_id ) , sandbox_request , sizeof ( struct sandbox_request * ) , false ) ) panic ( " the map of sandbox_request is NULL \n " ) ;
mapflag = true ;
}
LOCK_UNLOCK ( & lock ) ;
struct sandbox_request * sandbox_request = map_get ( sandbox_req_map , cur_request_id , strlen ( cur_request_id ) , & ret_value_len ) ;
if ( ! sandbox_request ) panic ( " the map of sandbox_request is NULL \n " ) ;
if ( mapflag )
{
pre_functions_output_request_add ( sandbox_request , pre_func_output , output_length , sandbox - > module - > run_priority ) ;
} else {
pthread_spin_lock ( & sandbox_request - > lock ) ;
pre_functions_output_request_add ( sandbox_request , pre_func_output , output_length , sandbox - > module - > run_priority ) ;
if ( ! request_id ) {
panic ( " Request ID not found or invalid \n " ) ;
} else {
rest_pre_count = * request_id ;
}
if ( rest_pre_count = = 888 ) panic ( " the rest_pre_count is not get requst_id \n " ) ;
rest_pre_count - - ;
if ( rest_pre_count ! = 0 )
{
map_upsert ( sandbox_request_id , cur_request_id , strlen ( cur_request_id ) , & rest_pre_count , sizeof ( uint32_t ) ) ;
} else {
uint64_t enqueue_timestamp = __getcycles ( ) ;
sandbox_request - > enqueue_timestamp = enqueue_timestamp ;
global_request_scheduler_add ( sandbox_request ) ;
map_delete ( sandbox_req_map , cur_request_id , strlen ( cur_request_id ) ) ;
map_delete ( sandbox_request_id , cur_request_id , strlen ( cur_request_id ) ) ;
}
pthread_spin_unlock ( & sandbox_request - > lock ) ;
}
free ( cur_request_id ) ;
cur_request_id = NULL ;
if ( sandbox - > request_from_outside ) {
sandbox_remove_from_epoll ( sandbox ) ;
}
sandbox_set_as_returned ( sandbox , SANDBOX_RUNNING ) ;
goto done ;
} else
{
error_message = " the strcuture of DAG is not supported \n " ;
goto err ;
}
} else {
/* Retrieve the result, construct the HTTP response, and send to client */
if ( sandbox_send_response ( sandbox ) < 0 ) {
error_message = " Unable to build and send client response \n " ;
goto err ;
} ;
http_total_increment_2xx ( ) ;
sandbox - > response_timestamp = __getcycles ( ) ;
assert ( sandbox - > state = = SANDBOX_RUNNING ) ;
sandbox_close_http ( sandbox ) ;
sandbox_set_as_returned ( sandbox , SANDBOX_RUNNING ) ;
}
done :
/* Cleanup connection and exit sandbox */
generic_thread_dump_lock_overhead ( ) ;
scheduler_yield ( ) ;
/* This assert prevents a segfault discussed in
* https : //github.com/phanikishoreg/awsm-Serverless-Framework/issues/66
*/
assert ( 0 ) ;
err :
debuglog ( " %s " , error_message ) ;
assert ( sandbox - > state = = SANDBOX_RUNNING ) ;
/* Send a 400 error back to the client */
client_socket_send ( sandbox - > client_socket_descriptor , 400 ) ;
sandbox_close_http ( sandbox ) ;
sandbox_set_as_error ( sandbox , SANDBOX_RUNNING ) ;
goto done ;
}