Compare commits

...

2 Commits

@ -10,7 +10,6 @@ if [[ $ARCH = "x86_64" ]]; then
elif [[ $ARCH = "aarch64" ]]; then elif [[ $ARCH = "aarch64" ]]; then
SHFMT_URL=https://github.com/patrickvane/shfmt/releases/download/master/shfmt_linux_arm SHFMT_URL=https://github.com/patrickvane/shfmt/releases/download/master/shfmt_linux_arm
echo "ARM64 support is still a work in progress!" echo "ARM64 support is still a work in progress!"
exit 1
else else
echo "This script only supports x86_64 and aarch64" echo "This script only supports x86_64 and aarch64"
exit 1 exit 1
@ -64,8 +63,6 @@ wget $SHFMT_URL -O shfmt && chmod +x shfmt && sudo mv shfmt /usr/local/bin/shfmt
sudo ./install_llvm.sh $LLVM_VERSION sudo ./install_llvm.sh $LLVM_VERSION
curl -sS -L -O $WASI_SDK_URL && sudo dpkg -i wasi-sdk_12.0_amd64.deb && rm -f wasi-sdk_12.0_amd64.deb
if [ -z "${WASI_SDK_PATH}" ]; then if [ -z "${WASI_SDK_PATH}" ]; then
export WASI_SDK_PATH=/opt/wasi-sdk export WASI_SDK_PATH=/opt/wasi-sdk
echo "export WASI_SDK_PATH=/opt/wasi-sdk" >> ~/.bashrc echo "export WASI_SDK_PATH=/opt/wasi-sdk" >> ~/.bashrc

@ -94,7 +94,7 @@ BINARY_NAME=sledgert
# This flag tracks the total number of sandboxes in the various states # This flag tracks the total number of sandboxes in the various states
# It is useful to debug if sandboxes are "getting caught" in a particular state # It is useful to debug if sandboxes are "getting caught" in a particular state
# CFLAGS += -DSANDBOX_STATE_TOTALS CFLAGS += -DSANDBOX_STATE_TOTALS
# This flag enables an per-worker atomic count of sandbox's local runqueue count in thread local storage # This flag enables an per-worker atomic count of sandbox's local runqueue count in thread local storage
# Useful to debug if sandboxes are "getting caught" or "leaking" while in a local runqueue # Useful to debug if sandboxes are "getting caught" or "leaking" while in a local runqueue

@ -4,7 +4,6 @@
#include <assert.h> #include <assert.h>
#include "arch/common.h" #include "arch/common.h"
#include "current_sandbox.h"
#define ARCH_SIG_JMP_OFF 0x100 /* Based on code generated! */ #define ARCH_SIG_JMP_OFF 0x100 /* Based on code generated! */
@ -47,24 +46,20 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
* Assumption: In the case of a slow context switch, the caller * Assumption: In the case of a slow context switch, the caller
* set current_sandbox to the sandbox containing the target context * set current_sandbox to the sandbox containing the target context
*/ */
if (b->variant == ARCH_CONTEXT_VARIANT_SLOW) { /*if (b->variant == ARCH_CONTEXT_VARIANT_SLOW) {
struct sandbox *current = current_sandbox_get(); struct sandbox *current = current_sandbox_get();
assert(current != NULL && b == &current->ctxt); assert(current != NULL && b == &current->ctxt);
} }*/
#endif #endif
/* if both a and b are NULL, there is no state change */ /* if both a and b are NULL, there is no state change */
assert(a != NULL || b != NULL); assert(a != NULL && b != NULL);
/* Assumption: The caller does not switch to itself */ /* Assumption: The caller does not switch to itself */
assert(a != b); assert(a != b);
/* Set any NULLs to worker_thread_base_context to resume execution of main */
if (a == NULL) a = &worker_thread_base_context;
if (b == NULL) b = &worker_thread_base_context;
/* A Transition {Unused, Running} -> Fast */ /* A Transition {Unused, Running} -> Fast */
assert(a->variant == ARCH_CONTEXT_VARIANT_UNUSED || a->variant == ARCH_CONTEXT_VARIANT_RUNNING); assert(a->variant == ARCH_CONTEXT_VARIANT_RUNNING);
/* B Transition {Fast, Slow} -> Running */ /* B Transition {Fast, Slow} -> Running */
assert(b->variant == ARCH_CONTEXT_VARIANT_FAST || b->variant == ARCH_CONTEXT_VARIANT_SLOW); assert(b->variant == ARCH_CONTEXT_VARIANT_FAST || b->variant == ARCH_CONTEXT_VARIANT_SLOW);
@ -87,6 +82,8 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
"ldr x1, [%[bv]]\n\t" "ldr x1, [%[bv]]\n\t"
"sub x1, x1, #2\n\t" "sub x1, x1, #2\n\t"
"cbz x1, slow%=\n\t" "cbz x1, slow%=\n\t"
"mov x3, #3\n\t"
"str x3, [%[bv]]\n\t" /* b->variant = ARCH_CONTEXT_VARIANT_RUNNING; */
"ldr x0, [%[b]]\n\t" "ldr x0, [%[b]]\n\t"
"ldr x1, [%[b], 8]\n\t" "ldr x1, [%[b], 8]\n\t"
"mov sp, x0\n\t" "mov sp, x0\n\t"
@ -95,8 +92,6 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
"br %[slowpath]\n\t" "br %[slowpath]\n\t"
".align 8\n\t" ".align 8\n\t"
"reset%=:\n\t" "reset%=:\n\t"
"mov x1, #3\n\t"
"str x1, [%[bv]]\n\t"
".align 8\n\t" ".align 8\n\t"
"exit%=:\n\t" "exit%=:\n\t"
: :
@ -109,6 +104,37 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
return 0; return 0;
} }
/**
* Load a new sandbox that preempted an existing sandbox, restoring only the
* instruction pointer and stack pointer registers.
* @param active_context - the context of the current worker thread
* @param sandbox_context - the context that we want to restore
*/
static inline void
arch_context_restore_fast(mcontext_t *active_context, struct arch_context *sandbox_context)
{
assert(active_context != NULL);
assert(sandbox_context != NULL);
/* Assumption: Base Context is only ever used by arch_context_switch */
assert(sandbox_context != &worker_thread_base_context);
assert(sandbox_context->regs[UREG_SP]);
assert(sandbox_context->regs[UREG_IP]);
/* Transitioning from Fast -> Running */
assert(sandbox_context->variant == ARCH_CONTEXT_VARIANT_FAST);
sandbox_context->variant = ARCH_CONTEXT_VARIANT_RUNNING;
//active_context->gregs[REG_RSP] = sandbox_context->regs[UREG_SP];
//active_context->gregs[REG_RIP] = sandbox_context->regs[UREG_IP];
active_context->sp = sandbox_context->regs[UREG_SP];
active_context->pc = sandbox_context->regs[UREG_IP];
}
#else #else
#warning "Neither AARCH64 nor aarch64 was defined, but aarch64/context.h was included!" #warning "Neither AARCH64 nor aarch64 was defined, but aarch64/context.h was included!"
#endif #endif

@ -197,8 +197,6 @@ http_session_set_response_header(struct http_session *session, int status_code)
rc = fprintf(session->response_header.handle, HTTP_RESPONSE_CONTENT_LENGTH, rc = fprintf(session->response_header.handle, HTTP_RESPONSE_CONTENT_LENGTH,
session->response_body.size); session->response_body.size);
assert(rc > 0); assert(rc > 0);
rc = fputs(HTTP_RESPONSE_200_OK, session->response_header.handle);
assert(rc != EOF);
} }
rc = fputs(HTTP_RESPONSE_TERMINATOR, session->response_header.handle); rc = fputs(HTTP_RESPONSE_TERMINATOR, session->response_header.handle);

@ -8,6 +8,9 @@
#include "runtime.h" #include "runtime.h"
extern uint64_t total_held[1024];
extern uint64_t longest_held[1024];
extern thread_local int thread_id;
/* A linked list of nodes */ /* A linked list of nodes */
struct lock_wrapper { struct lock_wrapper {
uint64_t longest_held; uint64_t longest_held;
@ -73,9 +76,14 @@ lock_unlock(lock_t *self, lock_node_t *node)
ck_spinlock_mcs_unlock(&self->lock, &node->node); ck_spinlock_mcs_unlock(&self->lock, &node->node);
uint64_t now = __getcycles(); uint64_t now = __getcycles();
assert(node->time_locked < now); assert(node->time_locked <= now);
uint64_t duration = now - node->time_locked; uint64_t duration = now - node->time_locked;
node->time_locked = 0; node->time_locked = 0;
if (unlikely(duration > self->longest_held)) { self->longest_held = duration; } if (unlikely(duration > self->longest_held)) { self->longest_held = duration; }
self->total_held += duration; self->total_held += duration;
if (unlikely(duration > longest_held[thread_id])) {
longest_held[thread_id] = duration;
}
total_held[thread_id] += duration;
} }

@ -48,6 +48,7 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox)
runtime_processor_speed_MHz); runtime_processor_speed_MHz);
} }
static inline void static inline void
sandbox_perf_log_init() sandbox_perf_log_init()
{ {

@ -34,7 +34,7 @@ sandbox_set_as_initialized(struct sandbox *sandbox, sandbox_state_t last_state)
} }
/* State Change Bookkeeping */ /* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change); assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change; sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration; sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now; sandbox->timestamp_of.last_state_change = now;

@ -43,7 +43,7 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state)
} }
/* State Change Bookkeeping */ /* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change); assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change; sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration; sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now; sandbox->timestamp_of.last_state_change = now;

@ -39,7 +39,7 @@ sandbox_set_as_running_sys(struct sandbox *sandbox, sandbox_state_t last_state)
} }
/* State Change Bookkeeping */ /* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change); assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change; sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration; sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now; sandbox->timestamp_of.last_state_change = now;

@ -35,7 +35,7 @@ sandbox_set_as_running_user(struct sandbox *sandbox, sandbox_state_t last_state)
/* State Change Bookkeeping */ /* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change); assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change; sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration; sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now; sandbox->timestamp_of.last_state_change = now;

@ -31,7 +31,7 @@ software_interrupt_mask_signal(int signal)
sigset_t set; sigset_t set;
int return_code; int return_code;
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV); assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV || signal == SIGINT);
/* all threads created by the calling thread will have signal blocked */ /* all threads created by the calling thread will have signal blocked */
sigemptyset(&set); sigemptyset(&set);
sigaddset(&set, signal); sigaddset(&set, signal);
@ -55,7 +55,7 @@ software_interrupt_unmask_signal(int signal)
sigset_t set; sigset_t set;
int return_code; int return_code;
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV); assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV || signal == SIGINT);
/* all threads created by the calling thread will have signal unblocked */ /* all threads created by the calling thread will have signal unblocked */
sigemptyset(&set); sigemptyset(&set);
sigaddset(&set, signal); sigaddset(&set, signal);

@ -13,6 +13,9 @@
#include "tenant_functions.h" #include "tenant_functions.h"
#include "http_session_perf_log.h" #include "http_session_perf_log.h"
extern thread_local int thread_id;
extern bool first_request_comming;
time_t t_start;
static void listener_thread_unregister_http_session(struct http_session *http); static void listener_thread_unregister_http_session(struct http_session *http);
static void panic_on_epoll_error(struct epoll_event *evt); static void panic_on_epoll_error(struct epoll_event *evt);
@ -191,6 +194,11 @@ on_client_request_arrival(int client_socket, const struct sockaddr *client_addre
static void static void
on_client_request_receiving(struct http_session *session) on_client_request_receiving(struct http_session *session)
{ {
if (first_request_comming == false){
t_start = time(NULL);
first_request_comming = true;
}
/* Read HTTP request */ /* Read HTTP request */
int rc = http_session_receive_request(session, (void_star_cb)listener_thread_register_http_session); int rc = http_session_receive_request(session, (void_star_cb)listener_thread_register_http_session);
if (likely(rc == 0)) { if (likely(rc == 0)) {
@ -251,6 +259,8 @@ on_client_request_received(struct http_session *session)
return; return;
} }
//struct timeval t_start,t_end;
//gettimeofday(&t_start, NULL);
/* If the global request scheduler is full, return a 429 to the client */ /* If the global request scheduler is full, return a 429 to the client */
if (unlikely(global_request_scheduler_add(sandbox) == NULL)) { if (unlikely(global_request_scheduler_add(sandbox) == NULL)) {
debuglog("Failed to add sandbox to global queue\n"); debuglog("Failed to add sandbox to global queue\n");
@ -259,6 +269,11 @@ on_client_request_received(struct http_session *session)
http_session_set_response_header(session, 429); http_session_set_response_header(session, 429);
on_client_response_header_sending(session); on_client_response_header_sending(session);
} }
//gettimeofday(&t_end, NULL);
//long cost = t_end.tv_sec * 1000000 + t_end.tv_usec - t_start.tv_sec * 1000000 - t_start.tv_usec;
//printf("%ld ", cost);
} }
static void static void
@ -401,6 +416,7 @@ on_client_socket_epoll_event(struct epoll_event *evt)
noreturn void * noreturn void *
listener_thread_main(void *dummy) listener_thread_main(void *dummy)
{ {
thread_id = 200;
struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS]; struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS];
metrics_server_init(); metrics_server_init();

@ -7,6 +7,7 @@
#include "local_runqueue.h" #include "local_runqueue.h"
static struct local_runqueue_config local_runqueue; static struct local_runqueue_config local_runqueue;
thread_local uint32_t total_local_requests = 0;
#ifdef LOG_LOCAL_RUNQUEUE #ifdef LOG_LOCAL_RUNQUEUE
thread_local uint32_t local_runqueue_count = 0; thread_local uint32_t local_runqueue_count = 0;
@ -27,6 +28,7 @@ void
local_runqueue_add(struct sandbox *sandbox) local_runqueue_add(struct sandbox *sandbox)
{ {
assert(local_runqueue.add_fn != NULL); assert(local_runqueue.add_fn != NULL);
total_local_requests++;
#ifdef LOG_LOCAL_RUNQUEUE #ifdef LOG_LOCAL_RUNQUEUE
local_runqueue_count++; local_runqueue_count++;
#endif #endif

@ -12,6 +12,10 @@
#include "sandbox_functions.h" #include "sandbox_functions.h"
#include "runtime.h" #include "runtime.h"
extern thread_local int thread_id;
uint64_t total_held[1024] = {0};
uint64_t longest_held[1024] = {0};
thread_local static struct priority_queue *local_runqueue_minheap; thread_local static struct priority_queue *local_runqueue_minheap;
/** /**

@ -42,6 +42,8 @@ bool runtime_preemption_enabled = true;
uint32_t runtime_quantum_us = 5000; /* 5ms */ uint32_t runtime_quantum_us = 5000; /* 5ms */
uint64_t runtime_boot_timestamp; uint64_t runtime_boot_timestamp;
pid_t runtime_pid = 0; pid_t runtime_pid = 0;
thread_local int thread_id = -1;
bool first_request_comming = false;
/** /**
* Returns instructions on use of CLI if used incorrectly * Returns instructions on use of CLI if used incorrectly
@ -448,7 +450,8 @@ main(int argc, char **argv)
printf("Runtime Environment:\n"); printf("Runtime Environment:\n");
runtime_processor_speed_MHz = runtime_get_processor_speed_MHz(); //runtime_processor_speed_MHz = runtime_get_processor_speed_MHz();
runtime_processor_speed_MHz = 1500;
if (unlikely(runtime_processor_speed_MHz == 0)) panic("Failed to detect processor speed\n"); if (unlikely(runtime_processor_speed_MHz == 0)) panic("Failed to detect processor speed\n");
int heading_length = 30; int heading_length = 30;

@ -9,6 +9,7 @@
#include <threads.h> #include <threads.h>
#include <unistd.h> #include <unistd.h>
#include <ucontext.h> #include <ucontext.h>
#include <inttypes.h>
#include "arch/context.h" #include "arch/context.h"
#include "current_sandbox.h" #include "current_sandbox.h"
@ -25,6 +26,9 @@
#include "software_interrupt.h" #include "software_interrupt.h"
#include "software_interrupt_counts.h" #include "software_interrupt_counts.h"
extern uint64_t total_held[1024];
extern uint64_t longest_held[1024];
thread_local _Atomic volatile sig_atomic_t handler_depth = 0; thread_local _Atomic volatile sig_atomic_t handler_depth = 0;
thread_local _Atomic volatile sig_atomic_t deferred_sigalrm = 0; thread_local _Atomic volatile sig_atomic_t deferred_sigalrm = 0;
@ -33,7 +37,9 @@ thread_local _Atomic volatile sig_atomic_t deferred_sigalrm = 0;
**************************************/ **************************************/
extern pthread_t *runtime_worker_threads; extern pthread_t *runtime_worker_threads;
extern thread_local uint32_t total_local_requests;
extern time_t t_start;
extern thread_local int worker_thread_idx;
/************************** /**************************
* Private Static Inlines * * Private Static Inlines *
*************************/ *************************/
@ -75,6 +81,33 @@ propagate_sigalrm(siginfo_t *signal_info)
} }
} }
/**
* A POSIX signal is delivered to only one thread.
* This function broadcasts the sigint signal to all other worker threads
*/
static inline void
sigint_propagate_workers_listener(siginfo_t *signal_info)
{
/* Signal was sent directly by the kernel user space, so forward to other threads */
if (signal_info->si_code == SI_KERNEL || signal_info->si_code == SI_USER) {
for (int i = 0; i < runtime_worker_threads_count; i++) {
if (pthread_self() == runtime_worker_threads[i]) continue;
/* All threads should have been initialized */
assert(runtime_worker_threads[i] != 0);
pthread_kill(runtime_worker_threads[i], SIGINT);
}
/* send to listener thread */
if (pthread_self() != listener_thread_id) {
pthread_kill(listener_thread_id, SIGINT);
}
} else {
/* Signal forwarded from another thread. Just confirm it resulted from pthread_kill */
assert(signal_info->si_code == SI_TKILL);
}
}
static inline bool static inline bool
worker_thread_is_running_cooperative_scheduler(void) worker_thread_is_running_cooperative_scheduler(void)
{ {
@ -184,6 +217,22 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
break; break;
} }
case SIGINT: {
/* Stop the alarm timer first */
software_interrupt_disarm_timer();
sigint_propagate_workers_listener(signal_info);
/* calculate the throughput */
time_t t_end = time(NULL);
double seconds = difftime(t_end, t_start);
double throughput = atomic_load(&sandbox_state_totals[SANDBOX_COMPLETE]) / seconds;
uint32_t total_sandboxes_error = atomic_load(&sandbox_state_totals[SANDBOX_ERROR]);
printf("throughput is %f, error request is %u global total request %d worker %d total requests is %u worker total_held %"PRIu64" longest_held %"PRIu64" listener total_held %"PRIu64" longest_held %"PRIu64"\n",
throughput, total_sandboxes_error, atomic_load(&sandbox_state_totals[SANDBOX_COMPLETE]), worker_thread_idx, total_local_requests, total_held[worker_thread_idx], longest_held[worker_thread_idx], total_held[200], longest_held[200]);
fflush(stdout);
pthread_exit(0);
}
default: { default: {
const char *signal_name = strsignal(signal_type); const char *signal_name = strsignal(signal_type);
switch (signal_info->si_code) { switch (signal_info->si_code) {
@ -268,9 +317,10 @@ software_interrupt_initialize(void)
sigaddset(&signal_action.sa_mask, SIGUSR1); sigaddset(&signal_action.sa_mask, SIGUSR1);
sigaddset(&signal_action.sa_mask, SIGFPE); sigaddset(&signal_action.sa_mask, SIGFPE);
sigaddset(&signal_action.sa_mask, SIGSEGV); sigaddset(&signal_action.sa_mask, SIGSEGV);
sigaddset(&signal_action.sa_mask, SIGINT);
const int supported_signals[] = { SIGALRM, SIGUSR1, SIGFPE, SIGSEGV }; const int supported_signals[] = { SIGALRM, SIGUSR1, SIGFPE, SIGSEGV, SIGINT };
const size_t supported_signals_len = 4; const size_t supported_signals_len = 5;
for (int i = 0; i < supported_signals_len; i++) { for (int i = 0; i < supported_signals_len; i++) {
int signal = supported_signals[i]; int signal = supported_signals[i];

@ -22,6 +22,7 @@
* Worker Thread State * * Worker Thread State *
**************************/ **************************/
extern thread_local int thread_id;
/* context of the runtime thread before running sandboxes or to resume its "main". */ /* context of the runtime thread before running sandboxes or to resume its "main". */
thread_local struct arch_context worker_thread_base_context; thread_local struct arch_context worker_thread_base_context;
@ -47,6 +48,7 @@ worker_thread_main(void *argument)
/* Index was passed via argument */ /* Index was passed via argument */
worker_thread_idx = *(int *)argument; worker_thread_idx = *(int *)argument;
thread_id = worker_thread_idx;
/* Set my priority */ /* Set my priority */
// runtime_set_pthread_prio(pthread_self(), 2); // runtime_set_pthread_prio(pthread_self(), 2);
@ -65,6 +67,9 @@ worker_thread_main(void *argument)
software_interrupt_unmask_signal(SIGFPE); software_interrupt_unmask_signal(SIGFPE);
software_interrupt_unmask_signal(SIGSEGV); software_interrupt_unmask_signal(SIGSEGV);
/* Unmask SIGINT signals */
software_interrupt_unmask_signal(SIGINT);
/* Unmask signals, unless the runtime has disabled preemption */ /* Unmask signals, unless the runtime has disabled preemption */
if (runtime_preemption_enabled) { if (runtime_preemption_enabled) {
software_interrupt_unmask_signal(SIGALRM); software_interrupt_unmask_signal(SIGALRM);

Loading…
Cancel
Save