You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sledge/runtime/include/sandbox_request.h

233 lines
8.2 KiB

#pragma once
#include <errno.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <sys/socket.h>
#include "debuglog.h"
#include "deque.h"
#include "http_total.h"
#include "module.h"
#include "runtime.h"
#include "sandbox_state.h"
#include "lock.h"
struct sandbox_pre_functions_output {
char * previous_function_output;
ssize_t output_length;
uint32_t run_priority;
struct sandbox_pre_functions_output* next;
};
struct sandbox_request {
uint64_t id;
bool request_from_outside; /* true is yes, false is no */
struct module * module;
char * arguments;
int socket_descriptor;
struct sockaddr socket_address;
uint64_t request_arrival_timestamp; /* cycles */
uint64_t enqueue_timestamp; /* cycles */
uint64_t absolute_deadline; /* cycles */
uint64_t last_update_timestamp; /* cycles */
uint64_t remaining_slack; /* cycles */
struct sandbox_pre_functions_output *pre_functions_output;
pthread_spinlock_t lock;
char * previous_function_output;
ssize_t output_length;
ssize_t previous_request_length;
/*
* Unitless estimate of the instantaneous fraction of system capacity required to run the request
* Calculated by estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles)
*/
uint64_t admissions_estimate;
};
DEQUE_PROTOTYPE(sandbox, struct sandbox_request *)
/* Count of the total number of requests we've ever allocated. Never decrements as it is used to generate IDs */
extern _Atomic uint32_t sandbox_request_count;
static inline void
sandbox_request_count_initialize()
{
atomic_init(&sandbox_request_count, 0);
}
static inline uint32_t
sandbox_request_count_postfix_increment()
{
return atomic_fetch_add(&sandbox_request_count, 1);
}
static inline void
sandbox_request_log_allocation(struct sandbox_request *sandbox_request)
{
#ifdef LOG_REQUEST_ALLOCATION
debuglog("Sandbox Request %lu: of %s:%d\n", sandbox_request->id, sandbox_request->module->name,
sandbox_request->module->port);
#endif
}
/**
* Allocates a new Sandbox Request and places it on the Global Deque
* @param module the module we want to request
* @param arguments the arguments that we'll pass to the serverless function
* @param socket_descriptor
* @param socket_address
* @param request_arrival_timestamp the timestamp of when we receives the request from the network (in cycles)
* @return the new sandbox request
*/
static inline struct sandbox_request *
sandbox_request_allocate(struct module *module, bool request_from_outside, ssize_t request_length,
char *arguments, int socket_descriptor, const struct sockaddr *socket_address,
uint64_t request_arrival_timestamp, uint64_t enqueue_timestamp, uint64_t remaining_slack,
uint64_t admissions_estimate, char *previous_function_output, ssize_t output_length)
{
struct sandbox_request *sandbox_request = (struct sandbox_request *)malloc(sizeof(struct sandbox_request));
assert(sandbox_request);
/* Sets the ID to the value before the increment */
sandbox_request->id = sandbox_request_count_postfix_increment();
sandbox_request->module = module;
sandbox_request->request_from_outside = request_from_outside;
sandbox_request->arguments = arguments;
sandbox_request->socket_descriptor = socket_descriptor;
memcpy(&sandbox_request->socket_address, socket_address, sizeof(struct sockaddr));
sandbox_request->request_arrival_timestamp = request_arrival_timestamp;
sandbox_request->enqueue_timestamp = enqueue_timestamp;
sandbox_request->absolute_deadline = request_arrival_timestamp + module->relative_deadline;
sandbox_request->previous_function_output = previous_function_output;
sandbox_request->output_length = output_length;
sandbox_request->previous_request_length = request_length;
sandbox_request->last_update_timestamp = enqueue_timestamp;
sandbox_request->remaining_slack = remaining_slack;
/*Avoid pointer suspension*/
sandbox_request->pre_functions_output = NULL;
pthread_spin_init(&sandbox_request->lock, PTHREAD_PROCESS_PRIVATE);
/*
* Admissions Control State
* Assumption: an estimate of 0 should have been interpreted as a rejection
*/
assert(admissions_estimate != 0);
sandbox_request->admissions_estimate = admissions_estimate;
sandbox_request_log_allocation(sandbox_request);
return sandbox_request;
}
/**
* Allocate a new node for the list of previous function outputs. Ensures that
* the list remains sorted based on run_priority, which must be less than 6.
* @param head the head of struct sandbox_request->previous_function_output
* @param output the output of the previous function
* @param output_length the length of the output
* @param run_priority the run_priority of the sandbox->module->run_priority
* The first four bytes of the data represent the length of the data, and the tail character & serves as the delimiter marker
**/
static inline void
pre_functions_output_request_add(struct sandbox_request *request, char *output, ssize_t output_length, uint32_t run_priority)
{
assert(run_priority < 6);
// pthread_spin_lock(&request->lock);
if (!output || output_length <= 0) {
debuglog("output is null or output_length is <= 0");
// goto done;
return;
}
struct sandbox_pre_functions_output **head = &request->pre_functions_output;
struct sandbox_pre_functions_output *new_node = (struct sandbox_pre_functions_output *)malloc(sizeof(struct sandbox_pre_functions_output));
if (!new_node) {
panic("Could not allocate memory for new node");
}
new_node->previous_function_output = (char *)malloc(output_length);
if (!new_node->previous_function_output) {
free(new_node);
panic("Could not allocate memory for output buffer");
}
//memcpy(new_node->previous_function_output, output, output_length);
new_node->previous_function_output = output;
new_node->output_length = output_length;
new_node->run_priority = run_priority;
new_node->next = NULL;
if (*head == NULL || (*head)->run_priority >= run_priority) {
new_node->next = *head;
*head = new_node;
} else {
struct sandbox_pre_functions_output *current = *head;
while (current->next && current->next->run_priority < run_priority) {
current = current->next;
}
new_node->next = current->next;
current->next = new_node;
}
//done:
// pthread_spin_unlock(&request->lock);
}
/**
* The output result of the sandbox in the splicing structure
**/
static inline void
concatenate_outputs(struct sandbox_request *request) {
size_t total_length = 0; // Calculate total length without extra for null character
struct sandbox_pre_functions_output *current = request->pre_functions_output;
if(current && !current->next)
{
char *previous_function_output = (char *)malloc(current->output_length);
if (!previous_function_output) {
panic("Could not allocate memory for concatenated output");
return;
}
memcpy(previous_function_output, current->previous_function_output, current->output_length);
request->output_length = current->output_length;
request->previous_function_output = previous_function_output;
return;
}
/* 4 bytes of data represents the length of the data */
while (current != NULL) {
total_length += current->output_length + 4;
current = current->next;
}
char *concatenated_output = (char *)malloc(total_length);
if (!concatenated_output) {
panic("Could not allocate memory for concatenated output");
return;
}
char *copy_dest = concatenated_output;
current = request->pre_functions_output;
while (current != NULL) {
size_t copy_length = current->output_length;
*(uint32_t *)copy_dest = (uint32_t)copy_length;
copy_dest += 4;
memcpy(copy_dest, current->previous_function_output, copy_length);
copy_dest += copy_length;
current = current->next;
}
if (request->previous_function_output != NULL) {
free(request->previous_function_output);
request->previous_function_output = NULL;
}
request->output_length = total_length;
request->previous_function_output = concatenated_output;
}