remove listener thread, let worker thread generate requests to global queue directly

worker_generates_requests_to_global_queue
Xiaosu Lyu 3 years ago
parent dac95d08f4
commit 9073972ba6

@ -10,7 +10,6 @@ if [[ $ARCH = "x86_64" ]]; then
elif [[ $ARCH = "aarch64" ]]; then
SHFMT_URL=https://github.com/patrickvane/shfmt/releases/download/master/shfmt_linux_arm
echo "ARM64 support is still a work in progress!"
exit 1
else
echo "This script only supports x86_64 and aarch64"
exit 1
@ -64,8 +63,6 @@ wget $SHFMT_URL -O shfmt && chmod +x shfmt && sudo mv shfmt /usr/local/bin/shfmt
sudo ./install_llvm.sh $LLVM_VERSION
curl -sS -L -O $WASI_SDK_URL && sudo dpkg -i wasi-sdk_12.0_amd64.deb && rm -f wasi-sdk_12.0_amd64.deb
if [ -z "${WASI_SDK_PATH}" ]; then
export WASI_SDK_PATH=/opt/wasi-sdk
echo "export WASI_SDK_PATH=/opt/wasi-sdk" >> ~/.bashrc

@ -94,7 +94,7 @@ BINARY_NAME=sledgert
# This flag tracks the total number of sandboxes in the various states
# It is useful to debug if sandboxes are "getting caught" in a particular state
# CFLAGS += -DSANDBOX_STATE_TOTALS
CFLAGS += -DSANDBOX_STATE_TOTALS
# This flag enables an per-worker atomic count of sandbox's local runqueue count in thread local storage
# Useful to debug if sandboxes are "getting caught" or "leaking" while in a local runqueue

@ -4,7 +4,6 @@
#include <assert.h>
#include "arch/common.h"
#include "current_sandbox.h"
#define ARCH_SIG_JMP_OFF 0x100 /* Based on code generated! */
@ -47,24 +46,20 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
* Assumption: In the case of a slow context switch, the caller
* set current_sandbox to the sandbox containing the target context
*/
if (b->variant == ARCH_CONTEXT_VARIANT_SLOW) {
/*if (b->variant == ARCH_CONTEXT_VARIANT_SLOW) {
struct sandbox *current = current_sandbox_get();
assert(current != NULL && b == &current->ctxt);
}
}*/
#endif
/* if both a and b are NULL, there is no state change */
assert(a != NULL || b != NULL);
assert(a != NULL && b != NULL);
/* Assumption: The caller does not switch to itself */
assert(a != b);
/* Set any NULLs to worker_thread_base_context to resume execution of main */
if (a == NULL) a = &worker_thread_base_context;
if (b == NULL) b = &worker_thread_base_context;
/* A Transition {Unused, Running} -> Fast */
assert(a->variant == ARCH_CONTEXT_VARIANT_UNUSED || a->variant == ARCH_CONTEXT_VARIANT_RUNNING);
assert(a->variant == ARCH_CONTEXT_VARIANT_RUNNING);
/* B Transition {Fast, Slow} -> Running */
assert(b->variant == ARCH_CONTEXT_VARIANT_FAST || b->variant == ARCH_CONTEXT_VARIANT_SLOW);
@ -87,6 +82,8 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
"ldr x1, [%[bv]]\n\t"
"sub x1, x1, #2\n\t"
"cbz x1, slow%=\n\t"
"mov x3, #3\n\t"
"str x3, [%[bv]]\n\t" /* b->variant = ARCH_CONTEXT_VARIANT_RUNNING; */
"ldr x0, [%[b]]\n\t"
"ldr x1, [%[b], 8]\n\t"
"mov sp, x0\n\t"
@ -95,8 +92,6 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
"br %[slowpath]\n\t"
".align 8\n\t"
"reset%=:\n\t"
"mov x1, #3\n\t"
"str x1, [%[bv]]\n\t"
".align 8\n\t"
"exit%=:\n\t"
:
@ -109,6 +104,37 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
return 0;
}
/**
* Load a new sandbox that preempted an existing sandbox, restoring only the
* instruction pointer and stack pointer registers.
* @param active_context - the context of the current worker thread
* @param sandbox_context - the context that we want to restore
*/
static inline void
arch_context_restore_fast(mcontext_t *active_context, struct arch_context *sandbox_context)
{
assert(active_context != NULL);
assert(sandbox_context != NULL);
/* Assumption: Base Context is only ever used by arch_context_switch */
assert(sandbox_context != &worker_thread_base_context);
assert(sandbox_context->regs[UREG_SP]);
assert(sandbox_context->regs[UREG_IP]);
/* Transitioning from Fast -> Running */
assert(sandbox_context->variant == ARCH_CONTEXT_VARIANT_FAST);
sandbox_context->variant = ARCH_CONTEXT_VARIANT_RUNNING;
//active_context->gregs[REG_RSP] = sandbox_context->regs[UREG_SP];
//active_context->gregs[REG_RIP] = sandbox_context->regs[UREG_IP];
active_context->sp = sandbox_context->regs[UREG_SP];
active_context->pc = sandbox_context->regs[UREG_IP];
}
#else
#warning "Neither AARCH64 nor aarch64 was defined, but aarch64/context.h was included!"
#endif

@ -10,6 +10,12 @@ struct auto_buf {
size_t size;
};
static inline void auto_buf_copy(struct auto_buf *dest, struct auto_buf *source) {
if (dest == NULL || source == NULL) return;
fwrite(source->data, 1, source->size, dest->handle);
fflush(dest->handle);
}
static inline int
auto_buf_init(struct auto_buf *buf)
{

@ -12,11 +12,28 @@ struct http_header {
int value_length;
};
static inline void http_header_copy(struct http_header *dest, struct http_header *source) {
if (source == NULL || dest == NULL) return;
dest->key = (char*) malloc (source->key_length);
memcpy(dest->key, source->key, source->key_length);
dest->key_length = source->key_length;
dest->value = (char*) malloc (source->value_length);
memcpy(dest->value, source->value, source->value_length);
dest->value_length = source->value_length;
}
struct http_query_param {
char value[HTTP_MAX_QUERY_PARAM_LENGTH];
int value_length;
};
static inline void http_query_param_copy(struct http_query_param *dest, struct http_query_param *source) {
if (source == NULL || dest == NULL) return;
memcpy(dest->value, source->value, HTTP_MAX_QUERY_PARAM_LENGTH);
dest->value_length = source->value_length;
}
struct http_request {
char full_url[HTTP_MAX_FULL_URL_LENGTH];
struct http_header headers[HTTP_MAX_HEADER_COUNT];
@ -40,4 +57,29 @@ struct http_request {
int cursor; /* Sandbox cursor (offset from body pointer) */
};
static inline void http_request_copy(struct http_request *dest, struct http_request *source) {
if (dest == NULL || source == NULL) return;
memcpy(dest->full_url, source->full_url, HTTP_MAX_FULL_URL_LENGTH);
for (int i = 0; i < HTTP_MAX_HEADER_COUNT; i++) {
http_header_copy(&(dest->headers[i]), &(source->headers[i]));
}
dest->header_count = source->header_count;
dest->method = source->method;
for (int i = 0; i < HTTP_MAX_QUERY_PARAM_COUNT; i++) {
http_query_param_copy(&(dest->query_params[i]), &(source->query_params[i]));
}
dest->query_params_count = source->query_params_count;
dest->body = (char*) malloc (source->body_length);
memcpy(dest->body, source->body, source->body_length);
dest->body_length = source->body_length;
dest->body_length_read = source->body_length_read;
dest->length_parsed = source->length_parsed;
dest->last_was_value = source->last_was_value;
dest->header_end = source->header_end;
dest->message_begin = source->message_begin;
dest->message_end = source->message_end;
}
void http_request_print(struct http_request *http_request);

@ -22,6 +22,7 @@
#include "tcp_session.h"
#include "tenant.h"
enum http_session_state
{
HTTP_SESSION_UNINITIALIZED = 0,
@ -59,8 +60,22 @@ struct http_session {
uint64_t response_sent_timestamp;
};
static inline void http_session_copy(struct http_session *dest, struct http_session *source) {
if (dest == NULL || source == NULL) return;
dest->tag = source->tag;
http_request_copy(&dest->http_request, &source->http_request);
auto_buf_copy(&dest->request_buffer, &source->request_buffer);
auto_buf_copy(&dest->response_header, &source->response_header);
dest->response_header_written = source->response_header_written;
auto_buf_copy(&dest->response_body, &source->response_body);
dest->response_body_written = source->response_body_written;
dest->route = source->route;
}
extern void http_session_perf_log_print_entry(struct http_session *http_session);
/**
* Initalize state associated with an http parser
* Because the http_parser structure uses pointers to the request buffer, if realloc moves the request
@ -102,6 +117,7 @@ http_session_init(struct http_session *session, int socket_descriptor, const str
/* Defer initializing response_body until we've matched a route */
auto_buf_init(&session->response_header);
session->state = HTTP_SESSION_INITIALIZED;
@ -265,7 +281,7 @@ http_session_send_response_body(struct http_session *session, void_star_cb on_ea
/* Assumption: Already flushed in order to write content-length to header */
// TODO: Test if body is empty
printf("body is %s\n", session->response_body.data);
while (session->response_body_written < session->response_body.size) {
ssize_t sent =
tcp_session_send(session->socket,
@ -384,7 +400,6 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai
if (old_buffer != session->request_buffer.data) {
http_request->body = header_length ? session->request_buffer.data + header_length : NULL;
}
if (http_session_parse(session, bytes_received) == -1) goto err;
}

@ -73,7 +73,7 @@ lock_unlock(lock_t *self, lock_node_t *node)
ck_spinlock_mcs_unlock(&self->lock, &node->node);
uint64_t now = __getcycles();
assert(node->time_locked < now);
assert(node->time_locked <= now);
uint64_t duration = now - node->time_locked;
node->time_locked = 0;
if (unlikely(duration > self->longest_held)) { self->longest_held = duration; }

@ -48,6 +48,7 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox)
runtime_processor_speed_MHz);
}
static inline void
sandbox_perf_log_init()
{

@ -60,7 +60,7 @@ sandbox_set_as_error(struct sandbox *sandbox, sandbox_state_t last_state)
/* Return HTTP session to listener core to be written back to client */
http_session_set_response_header(sandbox->http, 500);
sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session);
//http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session);
sandbox->http = NULL;
/* Terminal State Logging */

@ -34,7 +34,7 @@ sandbox_set_as_initialized(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -43,7 +43,7 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;
@ -53,7 +53,7 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state)
http_session_set_response_header(sandbox->http, 200);
sandbox->http->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session);
//http_session_send_response(sandbox->http, (void_star_cb)listener_thread_register_http_session);
sandbox->http = NULL;
/* State Change Hooks */

@ -39,7 +39,7 @@ sandbox_set_as_running_sys(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -35,7 +35,7 @@ sandbox_set_as_running_user(struct sandbox *sandbox, sandbox_state_t last_state)
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -31,7 +31,7 @@ software_interrupt_mask_signal(int signal)
sigset_t set;
int return_code;
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV);
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV || signal == SIGINT);
/* all threads created by the calling thread will have signal blocked */
sigemptyset(&set);
sigaddset(&set, signal);
@ -55,7 +55,7 @@ software_interrupt_unmask_signal(int signal)
sigset_t set;
int return_code;
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV);
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV || signal == SIGINT);
/* all threads created by the calling thread will have signal unblocked */
sigemptyset(&set);
sigaddset(&set, signal);

@ -11,9 +11,15 @@
#include "sandbox_set_as_running_user.h"
#include "sandbox_set_as_running_sys.h"
#include "scheduler.h"
#include "http_session.h"
#include "software_interrupt.h"
#include "wasi.h"
extern struct http_session *g_session;
extern struct tenant *g_tenant;
extern int g_client_socket;
extern struct sockaddr *g_client_address;
thread_local struct sandbox *worker_thread_current_sandbox = NULL;
/**
@ -63,6 +69,33 @@ current_sandbox_exit()
sandbox_state_stringify(exiting_sandbox->state));
}
/* generate a new request and enqueue it to the global queue */
assert(g_tenant != NULL);
struct tenant *tenant = g_tenant;
uint64_t request_arrival_timestamp = __getcycles();
http_total_increment_request();
/* Allocate http session */
struct http_session *session = http_session_alloc(g_client_socket, (const struct sockaddr *)&g_client_address, tenant, request_arrival_timestamp);
assert(session != NULL);
http_session_copy(session, g_session);
assert(session->route != NULL);
struct sandbox *sandbox = sandbox_alloc(session->route->module, session, session->route, session->tenant, 1);
if (unlikely(sandbox == NULL)) {
printf("Failed to allocate sandbox\n");
exit(-1);
}
/* If the global request scheduler is full, return a 429 to the client */
if (unlikely(global_request_scheduler_add(sandbox) == NULL)) {
printf("Failed to add sandbox to global queue\n");
exit(-1);
}
/****************************end**************************/
scheduler_cooperative_sched(true);
/* The scheduler should never switch back to completed sandboxes */

@ -19,7 +19,7 @@ global_request_scheduler_minheap_add(struct sandbox *sandbox)
{
assert(sandbox);
assert(global_request_scheduler_minheap);
if (unlikely(!listener_thread_is_running())) panic("%s is only callable by the listener thread\n", __func__);
//if (unlikely(!listener_thread_is_running())) panic("%s is only callable by the listener thread\n", __func__);
int return_code = priority_queue_enqueue(global_request_scheduler_minheap, sandbox);

@ -13,6 +13,14 @@
#include "tenant_functions.h"
#include "http_session_perf_log.h"
extern void http_session_copy(struct http_session *dest, struct http_session *source);
/////////////////xiaosu for test//////////////////
struct http_session *g_session = NULL;
struct tenant *g_tenant = NULL;
int g_client_socket = -1;
struct sockaddr *g_client_address = NULL;
////////////////xiaosu end for test///////////////
static void listener_thread_unregister_http_session(struct http_session *http);
static void panic_on_epoll_error(struct epoll_event *evt);
@ -168,6 +176,14 @@ panic_on_epoll_error(struct epoll_event *evt)
static void
on_client_request_arrival(int client_socket, const struct sockaddr *client_address, struct tenant *tenant)
{
if (g_client_socket == -1) {
g_client_socket = client_socket;
}
if (g_client_address == NULL) {
g_client_address = client_address;
}
uint64_t request_arrival_timestamp = __getcycles();
http_total_increment_request();
@ -242,6 +258,13 @@ on_client_request_received(struct http_session *session)
/* Allocate a Sandbox */
session->state = HTTP_SESSION_EXECUTING;
if (g_session == NULL) {
/* Allocate HTTP Session */
g_session = http_session_alloc(session->socket, (const struct sockaddr *)&(session->client_address),
session->tenant, session->request_arrival_timestamp);
http_session_copy(g_session, session);
}
struct sandbox *sandbox = sandbox_alloc(route->module, session, route, session->tenant, work_admitted);
if (unlikely(sandbox == NULL)) {
debuglog("Failed to allocate sandbox\n");
@ -317,6 +340,9 @@ on_tenant_socket_epoll_event(struct epoll_event *evt)
struct tenant *tenant = evt->data.ptr;
assert(tenant);
if (g_tenant == NULL) {
g_tenant = tenant;
}
/* Accept Client Request as a nonblocking socket, saving address information */
struct sockaddr_in client_address;

@ -35,6 +35,7 @@ uint32_t runtime_first_worker_processor = 1;
uint32_t runtime_processor_speed_MHz = 0;
uint32_t runtime_total_online_processors = 0;
uint32_t runtime_worker_threads_count = 0;
time_t t_start;
enum RUNTIME_SIGALRM_HANDLER runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_BROADCAST;
@ -476,6 +477,8 @@ main(int argc, char **argv)
printf("Runtime Environment:\n");
runtime_processor_speed_MHz = 1500;
runtime_set_resource_limits_to_max();
runtime_allocate_available_cores();
runtime_configure();
@ -484,7 +487,7 @@ main(int argc, char **argv)
listener_thread_initialize();
runtime_start_runtime_worker_threads();
runtime_get_processor_speed_MHz();
//runtime_get_processor_speed_MHz();
runtime_configure_worker_spinloop_pause();
software_interrupt_arm_timer();
@ -516,6 +519,7 @@ main(int argc, char **argv)
}
runtime_boot_timestamp = __getcycles();
t_start = time(NULL);
for (int tenant_idx = 0; tenant_idx < tenant_config_vec_len; tenant_idx++) {
tenant_config_deinit(&tenant_config_vec[tenant_idx]);

@ -33,7 +33,12 @@ thread_local _Atomic volatile sig_atomic_t deferred_sigalrm = 0;
**************************************/
extern pthread_t *runtime_worker_threads;
extern time_t t_start;
extern pthread_t listener_thread_id;
#ifdef SANDBOX_STATE_TOTALS
extern _Atomic uint32_t sandbox_state_totals[SANDBOX_STATE_COUNT];
#endif
/**************************
* Private Static Inlines *
*************************/
@ -75,6 +80,32 @@ propagate_sigalrm(siginfo_t *signal_info)
}
}
/**
* A POSIX signal is delivered to only one thread.
* This function broadcasts the sigint signal to all other worker threads
*/
static inline void
sigint_propagate_workers_listener(siginfo_t *signal_info)
{
/* Signal was sent directly by the kernel user space, so forward to other threads */
if (signal_info->si_code == SI_KERNEL || signal_info->si_code == SI_USER) {
for (int i = 0; i < runtime_worker_threads_count; i++) {
if (pthread_self() == runtime_worker_threads[i]) continue;
/* All threads should have been initialized */
assert(runtime_worker_threads[i] != 0);
pthread_kill(runtime_worker_threads[i], SIGINT);
}
/* send to listener thread */
if (pthread_self() != listener_thread_id) {
pthread_kill(listener_thread_id, SIGINT);
}
} else {
/* Signal forwarded from another thread. Just confirm it resulted from pthread_kill */
assert(signal_info->si_code == SI_TKILL);
}
}
static inline bool
worker_thread_is_running_cooperative_scheduler(void)
{
@ -184,6 +215,18 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
break;
}
case SIGINT: {
/* Stop the alarm timer first */
software_interrupt_disarm_timer();
sigint_propagate_workers_listener(signal_info);
/* calculate the throughput */
time_t t_end = time(NULL);
double seconds = difftime(t_end, t_start);
double throughput = atomic_load(&sandbox_state_totals[SANDBOX_COMPLETE]) / seconds;
uint32_t total_sandboxes_error = atomic_load(&sandbox_state_totals[SANDBOX_ERROR]);
printf("throughput is %f, error request is %u\n", throughput, total_sandboxes_error);
exit(0);
}
default: {
const char *signal_name = strsignal(signal_type);
switch (signal_info->si_code) {
@ -268,9 +311,10 @@ software_interrupt_initialize(void)
sigaddset(&signal_action.sa_mask, SIGUSR1);
sigaddset(&signal_action.sa_mask, SIGFPE);
sigaddset(&signal_action.sa_mask, SIGSEGV);
sigaddset(&signal_action.sa_mask, SIGINT);
const int supported_signals[] = { SIGALRM, SIGUSR1, SIGFPE, SIGSEGV };
const size_t supported_signals_len = 4;
const int supported_signals[] = { SIGALRM, SIGUSR1, SIGFPE, SIGSEGV, SIGINT };
const size_t supported_signals_len = 5;
for (int i = 0; i < supported_signals_len; i++) {
int signal = supported_signals[i];

@ -65,6 +65,9 @@ worker_thread_main(void *argument)
software_interrupt_unmask_signal(SIGFPE);
software_interrupt_unmask_signal(SIGSEGV);
/* Unmask SIGINT signals */
software_interrupt_unmask_signal(SIGINT);
/* Unmask signals, unless the runtime has disabled preemption */
if (runtime_preemption_enabled) {
software_interrupt_unmask_signal(SIGALRM);

Loading…
Cancel
Save