let listener thread distribute requests to worker queue with RR

listener_thread_distributes_requests_worker_queue_with_RR
Xiaosu Lyu 3 years ago
parent f004d6c827
commit 8c733a315a

@ -10,7 +10,6 @@ if [[ $ARCH = "x86_64" ]]; then
elif [[ $ARCH = "aarch64" ]]; then
SHFMT_URL=https://github.com/patrickvane/shfmt/releases/download/master/shfmt_linux_arm
echo "ARM64 support is still a work in progress!"
exit 1
else
echo "This script only supports x86_64 and aarch64"
exit 1
@ -64,8 +63,6 @@ wget $SHFMT_URL -O shfmt && chmod +x shfmt && sudo mv shfmt /usr/local/bin/shfmt
sudo ./install_llvm.sh $LLVM_VERSION
curl -sS -L -O $WASI_SDK_URL && sudo dpkg -i wasi-sdk_12.0_amd64.deb && rm -f wasi-sdk_12.0_amd64.deb
if [ -z "${WASI_SDK_PATH}" ]; then
export WASI_SDK_PATH=/opt/wasi-sdk
echo "export WASI_SDK_PATH=/opt/wasi-sdk" >> ~/.bashrc

@ -4,7 +4,6 @@
#include <assert.h>
#include "arch/common.h"
#include "current_sandbox.h"
#define ARCH_SIG_JMP_OFF 0x100 /* Based on code generated! */
@ -47,24 +46,20 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
* Assumption: In the case of a slow context switch, the caller
* set current_sandbox to the sandbox containing the target context
*/
if (b->variant == ARCH_CONTEXT_VARIANT_SLOW) {
/*if (b->variant == ARCH_CONTEXT_VARIANT_SLOW) {
struct sandbox *current = current_sandbox_get();
assert(current != NULL && b == &current->ctxt);
}
}*/
#endif
/* if both a and b are NULL, there is no state change */
assert(a != NULL || b != NULL);
assert(a != NULL && b != NULL);
/* Assumption: The caller does not switch to itself */
assert(a != b);
/* Set any NULLs to worker_thread_base_context to resume execution of main */
if (a == NULL) a = &worker_thread_base_context;
if (b == NULL) b = &worker_thread_base_context;
/* A Transition {Unused, Running} -> Fast */
assert(a->variant == ARCH_CONTEXT_VARIANT_UNUSED || a->variant == ARCH_CONTEXT_VARIANT_RUNNING);
assert(a->variant == ARCH_CONTEXT_VARIANT_RUNNING);
/* B Transition {Fast, Slow} -> Running */
assert(b->variant == ARCH_CONTEXT_VARIANT_FAST || b->variant == ARCH_CONTEXT_VARIANT_SLOW);
@ -87,6 +82,8 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
"ldr x1, [%[bv]]\n\t"
"sub x1, x1, #2\n\t"
"cbz x1, slow%=\n\t"
"mov x3, #3\n\t"
"str x3, [%[bv]]\n\t" /* b->variant = ARCH_CONTEXT_VARIANT_RUNNING; */
"ldr x0, [%[b]]\n\t"
"ldr x1, [%[b], 8]\n\t"
"mov sp, x0\n\t"
@ -95,8 +92,6 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
"br %[slowpath]\n\t"
".align 8\n\t"
"reset%=:\n\t"
"mov x1, #3\n\t"
"str x1, [%[bv]]\n\t"
".align 8\n\t"
"exit%=:\n\t"
:
@ -109,6 +104,37 @@ arch_context_switch(struct arch_context *a, struct arch_context *b)
return 0;
}
/**
* Load a new sandbox that preempted an existing sandbox, restoring only the
* instruction pointer and stack pointer registers.
* @param active_context - the context of the current worker thread
* @param sandbox_context - the context that we want to restore
*/
static inline void
arch_context_restore_fast(mcontext_t *active_context, struct arch_context *sandbox_context)
{
assert(active_context != NULL);
assert(sandbox_context != NULL);
/* Assumption: Base Context is only ever used by arch_context_switch */
assert(sandbox_context != &worker_thread_base_context);
assert(sandbox_context->regs[UREG_SP]);
assert(sandbox_context->regs[UREG_IP]);
/* Transitioning from Fast -> Running */
assert(sandbox_context->variant == ARCH_CONTEXT_VARIANT_FAST);
sandbox_context->variant = ARCH_CONTEXT_VARIANT_RUNNING;
//active_context->gregs[REG_RSP] = sandbox_context->regs[UREG_SP];
//active_context->gregs[REG_RIP] = sandbox_context->regs[UREG_IP];
active_context->sp = sandbox_context->regs[UREG_SP];
active_context->pc = sandbox_context->regs[UREG_IP];
}
#else
#warning "Neither AARCH64 nor aarch64 was defined, but aarch64/context.h was included!"
#endif

@ -197,8 +197,6 @@ http_session_set_response_header(struct http_session *session, int status_code)
rc = fprintf(session->response_header.handle, HTTP_RESPONSE_CONTENT_LENGTH,
session->response_body.size);
assert(rc > 0);
rc = fputs(HTTP_RESPONSE_200_OK, session->response_header.handle);
assert(rc != EOF);
}
rc = fputs(HTTP_RESPONSE_TERMINATOR, session->response_header.handle);

@ -6,18 +6,21 @@
/* Returns pointer back if successful, null otherwise */
typedef void (*local_runqueue_add_fn_t)(struct sandbox *);
typedef void (*local_runqueue_add_fn_t_idx)(int index, struct sandbox *);
typedef bool (*local_runqueue_is_empty_fn_t)(void);
typedef void (*local_runqueue_delete_fn_t)(struct sandbox *sandbox);
typedef struct sandbox *(*local_runqueue_get_next_fn_t)();
struct local_runqueue_config {
local_runqueue_add_fn_t add_fn;
local_runqueue_add_fn_t_idx add_fn_idx;
local_runqueue_is_empty_fn_t is_empty_fn;
local_runqueue_delete_fn_t delete_fn;
local_runqueue_get_next_fn_t get_next_fn;
};
void local_runqueue_add(struct sandbox *);
void local_runqueue_add_index(int index, struct sandbox *);
void local_runqueue_delete(struct sandbox *);
bool local_runqueue_is_empty();
struct sandbox *local_runqueue_get_next();

@ -73,7 +73,7 @@ lock_unlock(lock_t *self, lock_node_t *node)
ck_spinlock_mcs_unlock(&self->lock, &node->node);
uint64_t now = __getcycles();
assert(node->time_locked < now);
assert(node->time_locked <= now);
uint64_t duration = now - node->time_locked;
node->time_locked = 0;
if (unlikely(duration > self->longest_held)) { self->longest_held = duration; }

@ -26,7 +26,7 @@
#define RUNTIME_MAX_EPOLL_EVENTS 128
#define RUNTIME_MAX_TENANT_COUNT 32
#define RUNTIME_RELATIVE_DEADLINE_US_MAX 3600000000 /* One Hour. Fits in uint32_t */
#define RUNTIME_RUNQUEUE_SIZE 256 /* Minimum guaranteed size. Might grow! */
#define RUNTIME_RUNQUEUE_SIZE 10240 /* Minimum guaranteed size. Might grow! */
#define RUNTIME_TENANT_QUEUE_SIZE 4096
enum RUNTIME_SIGALRM_HANDLER

@ -48,6 +48,7 @@ sandbox_perf_log_print_entry(struct sandbox *sandbox)
runtime_processor_speed_MHz);
}
static inline void
sandbox_perf_log_init()
{

@ -3,6 +3,7 @@
#include <assert.h>
#include <stddef.h>
#include <stdint.h>
#include <inttypes.h>
#include "arch/context.h"
#include "current_sandbox.h"
@ -34,7 +35,7 @@ sandbox_set_as_initialized(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -43,7 +43,7 @@ sandbox_set_as_returned(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -2,6 +2,7 @@
#include <assert.h>
#include <stdint.h>
#include <inttypes.h>
#include "arch/getcycles.h"
#include "local_runqueue.h"
@ -31,7 +32,7 @@ sandbox_set_as_runnable(struct sandbox *sandbox, sandbox_state_t last_state)
switch (last_state) {
case SANDBOX_INITIALIZED: {
sandbox->timestamp_of.dispatched = now;
local_runqueue_add(sandbox);
//local_runqueue_add(sandbox);
break;
}
case SANDBOX_ASLEEP: {
@ -45,7 +46,7 @@ sandbox_set_as_runnable(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;
@ -58,7 +59,6 @@ sandbox_set_as_runnable(struct sandbox *sandbox, sandbox_state_t last_state)
sandbox_state_transition_to_hook(sandbox, SANDBOX_RUNNABLE);
}
static inline void
sandbox_wakeup(struct sandbox *sandbox)
{

@ -39,7 +39,7 @@ sandbox_set_as_running_sys(struct sandbox *sandbox, sandbox_state_t last_state)
}
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -35,7 +35,7 @@ sandbox_set_as_running_user(struct sandbox *sandbox, sandbox_state_t last_state)
/* State Change Bookkeeping */
assert(now > sandbox->timestamp_of.last_state_change);
assert(now >= sandbox->timestamp_of.last_state_change);
sandbox->last_state_duration = now - sandbox->timestamp_of.last_state_change;
sandbox->duration_of_state[last_state] += sandbox->last_state_duration;
sandbox->timestamp_of.last_state_change = now;

@ -114,11 +114,11 @@ scheduler_edf_get_next()
uint64_t local_deadline = local == NULL ? UINT64_MAX : local->absolute_deadline;
struct sandbox *global = NULL;
uint64_t global_deadline = global_request_scheduler_peek();
//uint64_t global_deadline = global_request_scheduler_peek();
/* Try to pull and allocate from the global queue if earlier
* This will be placed at the head of the local runqueue */
if (global_deadline < local_deadline) {
/*if (global_deadline < local_deadline) {
if (global_request_scheduler_remove_if_earlier(&global, local_deadline) == 0) {
assert(global != NULL);
assert(global->absolute_deadline < local_deadline);
@ -126,10 +126,16 @@ scheduler_edf_get_next()
assert(global->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(global, SANDBOX_INITIALIZED);
}
}*/
if (local != NULL) {
if (local->state == SANDBOX_INITIALIZED) {
sandbox_prepare_execution_environment(local);
assert(local->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(local, SANDBOX_INITIALIZED);
}
}
/* Return what is at the head of the local runqueue or NULL if empty */
return local_runqueue_get_next();
return local;
}
static inline struct sandbox *

@ -12,7 +12,11 @@
#include "tenant.h"
#include "tenant_functions.h"
#include "http_session_perf_log.h"
#include "sandbox_set_as_runnable.h"
struct priority_queue* worker_queues[1024];
extern uint32_t runtime_worker_threads_count;
int rr_index = 0;
static void listener_thread_unregister_http_session(struct http_session *http);
static void panic_on_epoll_error(struct epoll_event *evt);
@ -252,13 +256,19 @@ on_client_request_received(struct http_session *session)
}
/* If the global request scheduler is full, return a 429 to the client */
if (unlikely(global_request_scheduler_add(sandbox) == NULL)) {
/*if (unlikely(global_request_scheduler_add(sandbox) == NULL)) {
debuglog("Failed to add sandbox to global queue\n");
sandbox_free(sandbox);
session->state = HTTP_SESSION_EXECUTION_COMPLETE;
http_session_set_response_header(session, 429);
on_client_response_header_sending(session);
}*/
if (rr_index == runtime_worker_threads_count) {
rr_index = 0;
}
local_runqueue_add_index(rr_index, sandbox);
rr_index++;
}
static void

@ -33,6 +33,16 @@ local_runqueue_add(struct sandbox *sandbox)
return local_runqueue.add_fn(sandbox);
}
void
local_runqueue_add_index(int index, struct sandbox *sandbox)
{
assert(local_runqueue.add_fn_idx != NULL);
#ifdef LOG_LOCAL_RUNQUEUE
local_runqueue_count++;
#endif
return local_runqueue.add_fn_idx(index, sandbox);
}
/**
* Delete a sandbox from the run queue
* @param sandbox to delete

@ -12,6 +12,8 @@
#include "sandbox_functions.h"
#include "runtime.h"
extern struct priority_queue* worker_queues[1024];
extern thread_local int worker_thread_idx;
thread_local static struct priority_queue *local_runqueue_minheap;
/**
@ -21,7 +23,8 @@ thread_local static struct priority_queue *local_runqueue_minheap;
bool
local_runqueue_minheap_is_empty()
{
return priority_queue_length_nolock(local_runqueue_minheap) == 0;
//return priority_queue_length_nolock(local_runqueue_minheap) == 0;
return priority_queue_length(local_runqueue_minheap) == 0;
}
/**
@ -32,16 +35,32 @@ local_runqueue_minheap_is_empty()
void
local_runqueue_minheap_add(struct sandbox *sandbox)
{
int return_code = priority_queue_enqueue_nolock(local_runqueue_minheap, sandbox);
/*int return_code = priority_queue_enqueue_nolock(local_runqueue_minheap, sandbox);
if (unlikely(return_code == -ENOSPC)) {
struct priority_queue *temp = priority_queue_grow_nolock(local_runqueue_minheap);
if (unlikely(temp == NULL)) panic("Failed to grow local runqueue\n");
local_runqueue_minheap = temp;
return_code = priority_queue_enqueue_nolock(local_runqueue_minheap, sandbox);
if (unlikely(return_code == -ENOSPC)) panic("Thread Runqueue is full!\n");
}*/
int return_code = priority_queue_enqueue(local_runqueue_minheap, sandbox);
if (return_code != 0) {
printf("add request to local queue failed, exit\n");
exit(0);
}
}
void
local_runqueue_minheap_add_index(int index, struct sandbox *sandbox)
{
int return_code = priority_queue_enqueue(worker_queues[index], sandbox);
if (return_code != 0) {
printf("add request to local queue failed, exit\n");
exit(0);
}
}
/**
* Deletes a sandbox from the runqueue
* @param sandbox to delete
@ -51,7 +70,8 @@ local_runqueue_minheap_delete(struct sandbox *sandbox)
{
assert(sandbox != NULL);
int rc = priority_queue_delete_nolock(local_runqueue_minheap, sandbox);
//int rc = priority_queue_delete_nolock(local_runqueue_minheap, sandbox);
int rc = priority_queue_delete(local_runqueue_minheap, sandbox);
if (rc == -1) panic("Tried to delete sandbox %lu from runqueue, but was not present\n", sandbox->id);
}
@ -67,7 +87,8 @@ local_runqueue_minheap_get_next()
{
/* Get the deadline of the sandbox at the head of the local request queue */
struct sandbox *next = NULL;
int rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&next);
//int rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&next);
int rc = priority_queue_top(local_runqueue_minheap, (void **)&next);
if (rc == -ENOENT) return NULL;
@ -81,10 +102,12 @@ void
local_runqueue_minheap_initialize()
{
/* Initialize local state */
local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority);
local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, true, sandbox_get_priority);
worker_queues[worker_thread_idx] = local_runqueue_minheap;
/* Register Function Pointers for Abstract Scheduling API */
struct local_runqueue_config config = { .add_fn = local_runqueue_minheap_add,
.add_fn_idx = local_runqueue_minheap_add_index,
.is_empty_fn = local_runqueue_minheap_is_empty,
.delete_fn = local_runqueue_minheap_delete,
.get_next_fn = local_runqueue_minheap_get_next };

@ -448,7 +448,8 @@ main(int argc, char **argv)
printf("Runtime Environment:\n");
runtime_processor_speed_MHz = runtime_get_processor_speed_MHz();
//runtime_processor_speed_MHz = runtime_get_processor_speed_MHz();
runtime_processor_speed_MHz = 1500;
if (unlikely(runtime_processor_speed_MHz == 0)) panic("Failed to detect processor speed\n");
int heading_length = 30;

Loading…
Cancel
Save