diff --git a/runtime/include/runtime.h b/runtime/include/runtime.h index c2b10c5..44e32ad 100644 --- a/runtime/include/runtime.h +++ b/runtime/include/runtime.h @@ -8,9 +8,9 @@ #include // for epoll_create1(), epoll_ctl(), struct epoll_event // global queue for stealing (work-stealing-deque) -extern struct deque_sandbox *glb_dq; -extern pthread_mutex_t glbq_mtx; -extern int epfd; +extern struct deque_sandbox *global_deque; +extern pthread_mutex_t global_deque_mutex; +extern int epoll_file_descriptor; void alloc_linear_memory(void); void expand_memory(void); @@ -30,7 +30,7 @@ get_memory_string(u32 offset) char *naive_ptr = get_memory_ptr_for_runtime(offset, 1); int i = 0; - while (1) { + while (true) { // Keep bounds checking the waters over and over until we know it's safe (we find a terminating // character) char ith_element = get_memory_ptr_for_runtime(offset, i + 1)[i]; diff --git a/runtime/include/sandbox.h b/runtime/include/sandbox.h index 843057c..30b84bf 100644 --- a/runtime/include/sandbox.h +++ b/runtime/include/sandbox.h @@ -120,10 +120,10 @@ sbox_request_alloc(struct module *mod, char *args, int sock, const struct sockad s->addr = (struct sockaddr *)addr; sandbox_run(s); return s; -#else +#else /* SBOX_SCALE_ALLOC */ return sandbox_alloc(mod, args, sock, addr); #endif -#else +#else /* STANDALONE */ return sandbox_alloc(mod, args, sock, addr); #endif } @@ -208,8 +208,8 @@ void sandbox_response(void); // should have been called with stack allocated and sandbox_current() set! void sandbox_entry(void); void sandbox_exit(void); -extern struct deque_sandbox *glb_dq; -extern pthread_mutex_t glbq_mtx; +extern struct deque_sandbox *global_deque; +extern pthread_mutex_t global_deque_mutex; static inline int sandbox_deque_push(sbox_request_t *s) @@ -217,11 +217,11 @@ sandbox_deque_push(sbox_request_t *s) int ret; #if NCORES == 1 - pthread_mutex_lock(&glbq_mtx); + pthread_mutex_lock(&global_deque_mutex); #endif - ret = deque_push_sandbox(glb_dq, &s); + ret = deque_push_sandbox(global_deque, &s); #if NCORES == 1 - pthread_mutex_unlock(&glbq_mtx); + pthread_mutex_unlock(&global_deque_mutex); #endif return ret; @@ -233,11 +233,11 @@ sandbox_deque_pop(sbox_request_t **s) int ret; #if NCORES == 1 - pthread_mutex_lock(&glbq_mtx); + pthread_mutex_lock(&global_deque_mutex); #endif - ret = deque_pop_sandbox(glb_dq, s); + ret = deque_pop_sandbox(global_deque, s); #if NCORES == 1 - pthread_mutex_unlock(&glbq_mtx); + pthread_mutex_unlock(&global_deque_mutex); #endif return ret; @@ -252,7 +252,7 @@ sandbox_deque_steal(void) sandbox_deque_pop(&s); #else // TODO: check! is there a sandboxing thread on same core as udp-server thread? - int r = deque_steal_sandbox(glb_dq, &s); + int r = deque_steal_sandbox(global_deque, &s); if (r) s = NULL; #endif diff --git a/runtime/include/types.h b/runtime/include/types.h index e8dc7c0..5fb0441 100644 --- a/runtime/include/types.h +++ b/runtime/include/types.h @@ -82,7 +82,7 @@ extern __thread struct indirect_table_entry *module_indirect_table; // for sandbox linear memory isolation extern __thread void *sandbox_lmbase; extern __thread u32 sandbox_lmbound; -extern i32 logfd; +extern i32 log_file_descriptor; // functions in the module to lookup and call per sandbox. typedef i32 (*mod_main_fn_t)(i32 a, i32 b); @@ -124,7 +124,7 @@ typedef enum #ifdef DEBUG #ifdef NOSTDIO -#define debuglog(fmt, ...) dprintf(logfd, "(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__) +#define debuglog(fmt, ...) dprintf(log_file_descriptor, "(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__) #else #define debuglog(fmt, ...) printf("(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__) #endif diff --git a/runtime/src/main.c b/runtime/src/main.c index e233235..9f21149 100644 --- a/runtime/src/main.c +++ b/runtime/src/main.c @@ -12,15 +12,12 @@ #include #include -// TODO: I think this define is unused -#define MOD_LINE_MAX 1024 - -i32 logfd = -1; // Log File Descriptor -u32 ncores = 0; // Number of cores -u32 sbox_ncores = 0; // Number of Sandboxing Cores -u32 sbox_core_st = 0; // First Sandbox Core - -pthread_t rtthd[SBOX_NCORES]; // An array of runtime threads +i32 log_file_descriptor = -1; +u32 total_online_processors = 0; +u32 total_worker_processors = 0; +u32 first_worker_processor = 0; +int worker_threads_argument[SBOX_NCORES] = { 0 }; // This is always empty, as we don't pass an argument +pthread_t worker_threads[SBOX_NCORES]; static unsigned long long get_time() @@ -63,68 +60,46 @@ void set_resource_limits_to_max(){ } } -int -main(int argc, char **argv) -{ - printf("Starting Awsm\n"); -#ifndef STANDALONE - // Array of arguments passed to the start_routine via pthread_create - // This is always empty, as we don't pass an argument - int rtthd_ret[SBOX_NCORES] = { 0 }; - - // Initialize the array of runtime threads - memset(rtthd, 0, sizeof(pthread_t) * SBOX_NCORES); - - if (argc != 2) { - usage(argv[0]); - exit(-1); - } - - set_resource_limits_to_max(); - +void allocate_available_cores(){ // Find the number of processors currently online - ncores = sysconf(_SC_NPROCESSORS_ONLN); + total_online_processors = sysconf(_SC_NPROCESSORS_ONLN); // If multicore, we'll pin one core as a listener and run sandbox threads on all others - // If single core, we'll do everything on that one core - if (ncores > 1) { - u32 x = ncores - 1; - sbox_ncores = SBOX_NCORES; - if (x < SBOX_NCORES) sbox_ncores = x; - sbox_core_st = 1; + if (total_online_processors > 1) { + first_worker_processor = 1; + // SBOX_NCORES can be used as a cap on the number of cores to use + // But if there are few cores that SBOX_NCORES, just use what is available + u32 max_possible_workers = total_online_processors - 1; + total_worker_processors = (max_possible_workers >= SBOX_NCORES) ? SBOX_NCORES : max_possible_workers; } else { - sbox_ncores = 1; + // If single core, we'll do everything on CPUID 0 + first_worker_processor = 0; + total_worker_processors = 1; } - debuglog("Number of cores %u, sandboxing cores %u (start: %u) and module reqs %u\n", ncores, sbox_ncores, - sbox_core_st, MOD_REQ_CORE); + debuglog("Number of cores %u, sandboxing cores %u (start: %u) and module reqs %u\n", total_online_processors, total_worker_processors, + first_worker_processor, MOD_REQ_CORE); +} // If NOSTIO is defined, close stdin, stdout, stderr, and write to logfile named awesome.log. Otherwise, log to STDOUT // NOSTIO = No Standard Input/Output? +void process_nostio(){ #ifdef NOSTDIO fclose(stdout); fclose(stderr); fclose(stdin); - logfd = open(LOGFILE, O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU | S_IRWXG); - if (logfd < 0) { + log_file_descriptor = open(LOGFILE, O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU | S_IRWXG); + if (log_file_descriptor < 0) { perror("open"); exit(-1); } #else - logfd = 1; + log_file_descriptor = 1; #endif +} - runtime_init(); - - debuglog("Parsing modules file [%s]\n", argv[1]); - if (util_parse_modules_file_json(argv[1])) { - printf("failed to parse modules file[%s]\n", argv[1]); - exit(-1); - } - - runtime_thd_init(); - - for (int i = 0; i < sbox_ncores; i++) { - int ret = pthread_create(&rtthd[i], NULL, sandbox_run_func, (void *)&rtthd_ret[i]); +void start_worker_threads(){ + for (int i = 0; i < total_worker_processors; i++) { + int ret = pthread_create(&worker_threads[i], NULL, sandbox_run_func, (void *)&worker_threads_argument[i]); if (ret) { errno = ret; perror("pthread_create"); @@ -133,14 +108,14 @@ main(int argc, char **argv) cpu_set_t cs; CPU_ZERO(&cs); - CPU_SET(sbox_core_st + i, &cs); - ret = pthread_setaffinity_np(rtthd[i], sizeof(cs), &cs); + CPU_SET(first_worker_processor + i, &cs); + ret = pthread_setaffinity_np(worker_threads[i], sizeof(cs), &cs); assert(ret == 0); } debuglog("Sandboxing environment ready!\n"); - for (int i = 0; i < sbox_ncores; i++) { - int ret = pthread_join(rtthd[i], NULL); + for (int i = 0; i < total_worker_processors; i++) { + int ret = pthread_join(worker_threads[i], NULL); if (ret) { errno = ret; perror("pthread_join"); @@ -148,10 +123,11 @@ main(int argc, char **argv) } } - // runtime threads run forever!! so join should not return!! - printf("\nOh no..!! This can't be happening..!!\n"); + printf("\nWorker Threads unexpectedly returned!!\n"); exit(-1); -#else /* STANDALONE */ +} + +void execute_standalone(int argc, char **argv){ arch_context_init(&base_context, 0, 0); uv_loop_init(&uvio); @@ -179,5 +155,35 @@ main(int argc, char **argv) // fprintf(stderr, "%llu\n", en - st); exit(0); +} + +int +main(int argc, char **argv) +{ + printf("Starting Awsm\n"); +#ifndef STANDALONE + if (argc != 2) { + usage(argv[0]); + exit(-1); + } + + memset(worker_threads, 0, sizeof(pthread_t) * SBOX_NCORES); + + set_resource_limits_to_max(); + allocate_available_cores(); + process_nostio(); + runtime_init(); + + debuglog("Parsing modules file [%s]\n", argv[1]); + if (util_parse_modules_file_json(argv[1])) { + printf("failed to parse modules file[%s]\n", argv[1]); + exit(-1); + } + + runtime_thd_init(); + start_worker_threads(); + +#else /* STANDALONE */ + execute_standalone(); #endif } diff --git a/runtime/src/module.c b/runtime/src/module.c index a2a3e8d..8e351d4 100644 --- a/runtime/src/module.c +++ b/runtime/src/module.c @@ -74,7 +74,7 @@ module_server_init(struct module *m) accept_evt.data.ptr = (void *)m; accept_evt.events = EPOLLIN; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, m->srvsock, &accept_evt) < 0) assert(0); + if (epoll_ctl(epoll_file_descriptor, EPOLL_CTL_ADD, m->srvsock, &accept_evt) < 0) assert(0); #endif } diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index ea00f6b..2cb26ab 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -11,9 +11,9 @@ #include #include -struct deque_sandbox *glb_dq; -pthread_mutex_t glbq_mtx = PTHREAD_MUTEX_INITIALIZER; -int epfd; +struct deque_sandbox *global_deque; +pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER; +int epoll_file_descriptor; __thread static struct ps_list_head runq; // per-thread(core) run queue (doubly-linked list) __thread static struct ps_list_head endq; // per-thread(core) completion queue (doubly-linked list) @@ -200,7 +200,7 @@ void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt( pthread_kill(pthread_self(), SIGUSR1); assert(0); // should not get here.. - while (1) + while (true) ; } static inline void @@ -232,7 +232,7 @@ sandbox_run_func(void *data) uv_loop_init(&uvio); in_callback = 0; - while (1) { + while (true) { struct sandbox *s = sandbox_schedule_io(); while (s) { sandbox_switch(s); @@ -266,18 +266,18 @@ void sandbox_exit(void) { #ifndef STANDALONE - struct sandbox *curr = sandbox_current(); - assert(curr); + struct sandbox *current_sandbox = sandbox_current(); + assert(current_sandbox); softint_disable(); - sandbox_local_stop(curr); - curr->state = SANDBOX_RETURNED; + sandbox_local_stop(current_sandbox); + current_sandbox->state = SANDBOX_RETURNED; // free resources from "main function execution", as stack still in use. struct sandbox *n = sandbox_schedule(0); - assert(n != curr); + assert(n != current_sandbox); softint_enable(); // unmap linear memory only! - munmap(curr->linear_start, SBOX_MAX_MEM + PAGE_SIZE); - // sandbox_local_end(curr); + munmap(current_sandbox->linear_start, SBOX_MAX_MEM + PAGE_SIZE); + // sandbox_local_end(current_sandbox); sandbox_switch(n); #else sandbox_switch(NULL); @@ -290,26 +290,26 @@ sandbox_exit(void) * @return NULL * * Used Globals: - * epfd - the epoll file descriptor + * epoll_file_descriptor - the epoll file descriptor * */ void * runtime_accept_thdfn(void *d) { #ifndef STANDALONE - struct epoll_event *epevts = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); - int nreqs = 0; - while (1) { - int ready = epoll_wait(epfd, epevts, EPOLL_MAX, -1); + struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); + int total_requests = 0; + while (true) { + int ready = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1); for (int i = 0; i < ready; i++) { - if (epevts[i].events & EPOLLERR) { + if (epoll_events[i].events & EPOLLERR) { perror("epoll_wait"); assert(0); } struct sockaddr_in client; socklen_t client_len = sizeof(client); - struct module * m = (struct module *)epevts[i].data.ptr; + struct module * m = (struct module *)epoll_events[i].data.ptr; assert(m); int es = m->srvsock; int s = accept(es, (struct sockaddr *)&client, &client_len); @@ -317,7 +317,8 @@ runtime_accept_thdfn(void *d) perror("accept"); assert(0); } - nreqs++; + total_requests++; + printf("Handling Request %d\n", total_requests); // struct sandbox *sb = sandbox_alloc(m, m->name, s, (const struct sockaddr *)&client); sbox_request_t *sb = sbox_request_alloc(m, m->name, s, (const struct sockaddr *)&client); @@ -325,7 +326,7 @@ runtime_accept_thdfn(void *d) } } - free(epevts); + free(epoll_events); #endif return NULL; @@ -337,13 +338,13 @@ runtime_accept_thdfn(void *d) void runtime_init(void) { - epfd = epoll_create1(0); - assert(epfd >= 0); - glb_dq = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); - assert(glb_dq); + epoll_file_descriptor = epoll_create1(0); + assert(epoll_file_descriptor >= 0); + global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); + assert(global_deque); // Note: Below is a Macro - deque_init_sandbox(glb_dq, SBOX_MAX_REQS); + deque_init_sandbox(global_deque, SBOX_MAX_REQS); softint_mask(SIGUSR1); softint_mask(SIGALRM); diff --git a/runtime/src/softint.c b/runtime/src/softint.c index 11acebb..a5cce99 100644 --- a/runtime/src/softint.c +++ b/runtime/src/softint.c @@ -85,7 +85,7 @@ skip: return; } -extern pthread_t rtthd[]; +extern pthread_t worker_threads[]; static inline void softint_handler(int sig, siginfo_t *si, void *u) @@ -103,11 +103,11 @@ softint_handler(int sig, siginfo_t *si, void *u) int rt = 0; // deliver signal to all other runtime threads.. for (int i = 0; i < SBOX_NCORES; i++) { - if (pthread_self() == rtthd[i]) { + if (pthread_self() == worker_threads[i]) { rt = 1; continue; } - pthread_kill(rtthd[i], SIGALRM); + pthread_kill(worker_threads[i], SIGALRM); } assert(rt == 1); } else { diff --git a/runtime/tools/httpclient/client.c b/runtime/tools/httpclient/client.c index b1d2d2d..2eeb5e4 100644 --- a/runtime/tools/httpclient/client.c +++ b/runtime/tools/httpclient/client.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -244,7 +245,7 @@ skip: int main(int argc, char **argv) { - while (1) { + while (true) { int s = 0; printf("Test? (0 = exit)\n"); diff --git a/runtime/tools/udpclient/udpclient.c b/runtime/tools/udpclient/udpclient.c index a3439e7..c2aad19 100644 --- a/runtime/tools/udpclient/udpclient.c +++ b/runtime/tools/udpclient/udpclient.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -82,7 +83,7 @@ main(int argc, char *argv[]) return -1; } - while (1) { + while (true) { fseek(f, 0, SEEK_SET); char line[MSG_MAX] = { 0 };