print out the lock total hold time and longest hold time of each worker thread and listener thread

listener_thread_distributes_requests_worker_queue_with_RR
Xiaosu Lyu 3 years ago
parent 8c733a315a
commit 72feda73d8

@ -94,7 +94,7 @@ BINARY_NAME=sledgert
# This flag tracks the total number of sandboxes in the various states
# It is useful to debug if sandboxes are "getting caught" in a particular state
# CFLAGS += -DSANDBOX_STATE_TOTALS
CFLAGS += -DSANDBOX_STATE_TOTALS
# This flag enables an per-worker atomic count of sandbox's local runqueue count in thread local storage
# Useful to debug if sandboxes are "getting caught" or "leaking" while in a local runqueue

@ -7,7 +7,10 @@
#include "arch/getcycles.h"
#include "runtime.h"
extern uint64_t total_held[1024];
extern uint64_t longest_held[1024];
extern thread_local int thread_id;
/* A linked list of nodes */
struct lock_wrapper {
uint64_t longest_held;
@ -76,6 +79,12 @@ lock_unlock(lock_t *self, lock_node_t *node)
assert(node->time_locked <= now);
uint64_t duration = now - node->time_locked;
node->time_locked = 0;
if (unlikely(duration > self->longest_held)) { self->longest_held = duration; }
if (unlikely(duration > self->longest_held)) {
self->longest_held = duration;
}
self->total_held += duration;
if (unlikely(duration > longest_held[thread_id])) {
longest_held[thread_id] = duration;
}
total_held[thread_id] += duration;
}

@ -26,7 +26,7 @@
#define RUNTIME_MAX_EPOLL_EVENTS 128
#define RUNTIME_MAX_TENANT_COUNT 32
#define RUNTIME_RELATIVE_DEADLINE_US_MAX 3600000000 /* One Hour. Fits in uint32_t */
#define RUNTIME_RUNQUEUE_SIZE 10240 /* Minimum guaranteed size. Might grow! */
#define RUNTIME_RUNQUEUE_SIZE 10240000 /* Minimum guaranteed size. Might grow! */
#define RUNTIME_TENANT_QUEUE_SIZE 4096
enum RUNTIME_SIGALRM_HANDLER

@ -31,7 +31,7 @@ software_interrupt_mask_signal(int signal)
sigset_t set;
int return_code;
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV);
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV || signal == SIGINT);
/* all threads created by the calling thread will have signal blocked */
sigemptyset(&set);
sigaddset(&set, signal);
@ -55,7 +55,7 @@ software_interrupt_unmask_signal(int signal)
sigset_t set;
int return_code;
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV);
assert(signal == SIGALRM || signal == SIGUSR1 || signal == SIGFPE || signal == SIGSEGV || signal == SIGINT);
/* all threads created by the calling thread will have signal unblocked */
sigemptyset(&set);
sigaddset(&set, signal);

@ -14,6 +14,7 @@
#include "http_session_perf_log.h"
#include "sandbox_set_as_runnable.h"
extern thread_local int thread_id;
struct priority_queue* worker_queues[1024];
extern uint32_t runtime_worker_threads_count;
int rr_index = 0;
@ -59,7 +60,6 @@ listener_thread_initialize(void)
assert(ret == 0);
ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs);
assert(ret == 0);
printf("\tListener core thread: %lx\n", listener_thread_id);
}
@ -267,7 +267,12 @@ on_client_request_received(struct http_session *session)
if (rr_index == runtime_worker_threads_count) {
rr_index = 0;
}
//struct timeval t_start,t_end;
//gettimeofday(&t_start, NULL);
local_runqueue_add_index(rr_index, sandbox);
//gettimeofday(&t_end, NULL);
//long cost = t_end.tv_sec * 1000000 + t_end.tv_usec - t_start.tv_sec * 1000000 - t_start.tv_usec;
//printf("%ld ", cost);
rr_index++;
}
@ -411,6 +416,7 @@ on_client_socket_epoll_event(struct epoll_event *evt)
noreturn void *
listener_thread_main(void *dummy)
{
thread_id = 200;
struct epoll_event epoll_events[RUNTIME_MAX_EPOLL_EVENTS];
metrics_server_init();

@ -8,6 +8,7 @@
static struct local_runqueue_config local_runqueue;
thread_local uint32_t total_local_requests = 0;
#ifdef LOG_LOCAL_RUNQUEUE
thread_local uint32_t local_runqueue_count = 0;
#endif
@ -51,6 +52,7 @@ void
local_runqueue_delete(struct sandbox *sandbox)
{
assert(local_runqueue.delete_fn != NULL);
total_local_requests++;
#ifdef LOG_LOCAL_RUNQUEUE
local_runqueue_count--;
#endif

@ -12,6 +12,10 @@
#include "sandbox_functions.h"
#include "runtime.h"
extern thread_local int thread_id;
uint64_t total_held[1024] = {0};
uint64_t longest_held[1024] = {0};
extern struct priority_queue* worker_queues[1024];
extern thread_local int worker_thread_idx;
thread_local static struct priority_queue *local_runqueue_minheap;
@ -46,8 +50,12 @@ local_runqueue_minheap_add(struct sandbox *sandbox)
int return_code = priority_queue_enqueue(local_runqueue_minheap, sandbox);
if (return_code != 0) {
printf("add request to local queue failed, exit\n");
exit(0);
if (return_code == -ENOSPC) {
panic("Thread Runqueue is full!\n");
} else {
printf("add request to local queue failed, exit\n");
exit(0);
}
}
}
@ -56,8 +64,12 @@ local_runqueue_minheap_add_index(int index, struct sandbox *sandbox)
{
int return_code = priority_queue_enqueue(worker_queues[index], sandbox);
if (return_code != 0) {
printf("add request to local queue failed, exit\n");
exit(0);
if (return_code == -ENOSPC) {
panic("Thread Runqueue is full!\n");
} else {
printf("add request to local queue failed, exit\n");
exit(0);
}
}
}

@ -41,8 +41,10 @@ enum RUNTIME_SIGALRM_HANDLER runtime_sigalrm_handler = RUNTIME_SIGALRM_HANDLER_B
bool runtime_preemption_enabled = true;
uint32_t runtime_quantum_us = 5000; /* 5ms */
uint64_t runtime_boot_timestamp;
time_t t_start;
pid_t runtime_pid = 0;
thread_local int thread_id = -1;
/**
* Returns instructions on use of CLI if used incorrectly
* @param cmd - The command the user entered
@ -494,6 +496,7 @@ main(int argc, char **argv)
}
runtime_boot_timestamp = __getcycles();
t_start = time(NULL);
for (int tenant_idx = 0; tenant_idx < tenant_config_vec_len; tenant_idx++) {
tenant_config_deinit(&tenant_config_vec[tenant_idx]);

@ -9,6 +9,7 @@
#include <threads.h>
#include <unistd.h>
#include <ucontext.h>
#include <inttypes.h>
#include "arch/context.h"
#include "current_sandbox.h"
@ -25,14 +26,24 @@
#include "software_interrupt.h"
#include "software_interrupt_counts.h"
#ifdef SANDBOX_STATE_TOTALS
extern _Atomic uint32_t sandbox_state_totals[SANDBOX_STATE_COUNT];
#endif
extern uint64_t total_held[1024];
extern uint64_t longest_held[1024];
thread_local _Atomic volatile sig_atomic_t handler_depth = 0;
thread_local _Atomic volatile sig_atomic_t deferred_sigalrm = 0;
/***************************************
* Externs
**************************************/
extern thread_local uint32_t total_local_requests;
extern pthread_t *runtime_worker_threads;
extern time_t t_start;
extern thread_local int worker_thread_idx;
/**************************
* Private Static Inlines *
@ -74,6 +85,31 @@ propagate_sigalrm(siginfo_t *signal_info)
assert(signal_info->si_code == SI_TKILL);
}
}
/**
* A POSIX signal is delivered to only one thread.
* This function broadcasts the sigint signal to all other worker threads
*/
static inline void
sigint_propagate_workers_listener(siginfo_t *signal_info)
{
/* Signal was sent directly by the kernel user space, so forward to other threads */
if (signal_info->si_code == SI_KERNEL || signal_info->si_code == SI_USER) {
for (int i = 0; i < runtime_worker_threads_count; i++) {
if (pthread_self() == runtime_worker_threads[i]) continue;
/* All threads should have been initialized */
assert(runtime_worker_threads[i] != 0);
pthread_kill(runtime_worker_threads[i], SIGINT);
}
/* send to listener thread */
if (pthread_self() != listener_thread_id) {
pthread_kill(listener_thread_id, SIGINT);
}
} else {
/* Signal forwarded from another thread. Just confirm it resulted from pthread_kill */
assert(signal_info->si_code == SI_TKILL);
}
}
static inline bool
worker_thread_is_running_cooperative_scheduler(void)
@ -184,6 +220,21 @@ software_interrupt_handle_signals(int signal_type, siginfo_t *signal_info, void
break;
}
case SIGINT: {
/* Stop the alarm timer first */
software_interrupt_disarm_timer();
sigint_propagate_workers_listener(signal_info);
/* calculate the throughput */
time_t t_end = time(NULL);
double seconds = difftime(t_end, t_start);
double throughput = atomic_load(&sandbox_state_totals[SANDBOX_COMPLETE]) / seconds;
uint32_t total_sandboxes_error = atomic_load(&sandbox_state_totals[SANDBOX_ERROR]);
printf("throughput is %f, error request is %u global total request %d worker %d total requests is %u worker total_held %"PRIu64" longest_held %"PRIu64" listener total_held %"PRIu64" longest_held %"PRIu64"\n",
throughput, total_sandboxes_error, atomic_load(&sandbox_state_totals[SANDBOX_COMPLETE]), worker_thread_idx, total_local_requests, total_held[worker_thread_idx], longest_held[worker_thread_idx], total_held[200], longest_held[200]);
fflush(stdout);
pthread_exit(0);
}
default: {
const char *signal_name = strsignal(signal_type);
switch (signal_info->si_code) {
@ -268,9 +319,10 @@ software_interrupt_initialize(void)
sigaddset(&signal_action.sa_mask, SIGUSR1);
sigaddset(&signal_action.sa_mask, SIGFPE);
sigaddset(&signal_action.sa_mask, SIGSEGV);
sigaddset(&signal_action.sa_mask, SIGINT);
const int supported_signals[] = { SIGALRM, SIGUSR1, SIGFPE, SIGSEGV };
const size_t supported_signals_len = 4;
const int supported_signals[] = { SIGALRM, SIGUSR1, SIGFPE, SIGSEGV, SIGINT};
const size_t supported_signals_len = 5;
for (int i = 0; i < supported_signals_len; i++) {
int signal = supported_signals[i];

@ -18,6 +18,7 @@
#include "tenant_functions.h"
#include "priority_queue.h"
extern thread_local int thread_id;
/***************************
* Worker Thread State *
**************************/
@ -47,7 +48,7 @@ worker_thread_main(void *argument)
/* Index was passed via argument */
worker_thread_idx = *(int *)argument;
thread_id = worker_thread_idx;
/* Set my priority */
// runtime_set_pthread_prio(pthread_self(), 2);
pthread_setschedprio(pthread_self(), -20);
@ -65,6 +66,9 @@ worker_thread_main(void *argument)
software_interrupt_unmask_signal(SIGFPE);
software_interrupt_unmask_signal(SIGSEGV);
/* Unmask SIGINT signals */
software_interrupt_unmask_signal(SIGINT);
/* Unmask signals, unless the runtime has disabled preemption */
if (runtime_preemption_enabled) {
software_interrupt_unmask_signal(SIGALRM);

Loading…
Cancel
Save