refactor: modularize and rename things

master
Sean McBride 5 years ago
parent 565a03db5d
commit 6946b08644

@ -8,9 +8,9 @@
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event #include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
// global queue for stealing (work-stealing-deque) // global queue for stealing (work-stealing-deque)
extern struct deque_sandbox *glb_dq; extern struct deque_sandbox *global_deque;
extern pthread_mutex_t glbq_mtx; extern pthread_mutex_t global_deque_mutex;
extern int epfd; extern int epoll_file_descriptor;
void alloc_linear_memory(void); void alloc_linear_memory(void);
void expand_memory(void); void expand_memory(void);
@ -30,7 +30,7 @@ get_memory_string(u32 offset)
char *naive_ptr = get_memory_ptr_for_runtime(offset, 1); char *naive_ptr = get_memory_ptr_for_runtime(offset, 1);
int i = 0; int i = 0;
while (1) { while (true) {
// Keep bounds checking the waters over and over until we know it's safe (we find a terminating // Keep bounds checking the waters over and over until we know it's safe (we find a terminating
// character) // character)
char ith_element = get_memory_ptr_for_runtime(offset, i + 1)[i]; char ith_element = get_memory_ptr_for_runtime(offset, i + 1)[i];

@ -120,10 +120,10 @@ sbox_request_alloc(struct module *mod, char *args, int sock, const struct sockad
s->addr = (struct sockaddr *)addr; s->addr = (struct sockaddr *)addr;
sandbox_run(s); sandbox_run(s);
return s; return s;
#else #else /* SBOX_SCALE_ALLOC */
return sandbox_alloc(mod, args, sock, addr); return sandbox_alloc(mod, args, sock, addr);
#endif #endif
#else #else /* STANDALONE */
return sandbox_alloc(mod, args, sock, addr); return sandbox_alloc(mod, args, sock, addr);
#endif #endif
} }
@ -208,8 +208,8 @@ void sandbox_response(void);
// should have been called with stack allocated and sandbox_current() set! // should have been called with stack allocated and sandbox_current() set!
void sandbox_entry(void); void sandbox_entry(void);
void sandbox_exit(void); void sandbox_exit(void);
extern struct deque_sandbox *glb_dq; extern struct deque_sandbox *global_deque;
extern pthread_mutex_t glbq_mtx; extern pthread_mutex_t global_deque_mutex;
static inline int static inline int
sandbox_deque_push(sbox_request_t *s) sandbox_deque_push(sbox_request_t *s)
@ -217,11 +217,11 @@ sandbox_deque_push(sbox_request_t *s)
int ret; int ret;
#if NCORES == 1 #if NCORES == 1
pthread_mutex_lock(&glbq_mtx); pthread_mutex_lock(&global_deque_mutex);
#endif #endif
ret = deque_push_sandbox(glb_dq, &s); ret = deque_push_sandbox(global_deque, &s);
#if NCORES == 1 #if NCORES == 1
pthread_mutex_unlock(&glbq_mtx); pthread_mutex_unlock(&global_deque_mutex);
#endif #endif
return ret; return ret;
@ -233,11 +233,11 @@ sandbox_deque_pop(sbox_request_t **s)
int ret; int ret;
#if NCORES == 1 #if NCORES == 1
pthread_mutex_lock(&glbq_mtx); pthread_mutex_lock(&global_deque_mutex);
#endif #endif
ret = deque_pop_sandbox(glb_dq, s); ret = deque_pop_sandbox(global_deque, s);
#if NCORES == 1 #if NCORES == 1
pthread_mutex_unlock(&glbq_mtx); pthread_mutex_unlock(&global_deque_mutex);
#endif #endif
return ret; return ret;
@ -252,7 +252,7 @@ sandbox_deque_steal(void)
sandbox_deque_pop(&s); sandbox_deque_pop(&s);
#else #else
// TODO: check! is there a sandboxing thread on same core as udp-server thread? // TODO: check! is there a sandboxing thread on same core as udp-server thread?
int r = deque_steal_sandbox(glb_dq, &s); int r = deque_steal_sandbox(global_deque, &s);
if (r) s = NULL; if (r) s = NULL;
#endif #endif

@ -82,7 +82,7 @@ extern __thread struct indirect_table_entry *module_indirect_table;
// for sandbox linear memory isolation // for sandbox linear memory isolation
extern __thread void *sandbox_lmbase; extern __thread void *sandbox_lmbase;
extern __thread u32 sandbox_lmbound; extern __thread u32 sandbox_lmbound;
extern i32 logfd; extern i32 log_file_descriptor;
// functions in the module to lookup and call per sandbox. // functions in the module to lookup and call per sandbox.
typedef i32 (*mod_main_fn_t)(i32 a, i32 b); typedef i32 (*mod_main_fn_t)(i32 a, i32 b);
@ -124,7 +124,7 @@ typedef enum
#ifdef DEBUG #ifdef DEBUG
#ifdef NOSTDIO #ifdef NOSTDIO
#define debuglog(fmt, ...) dprintf(logfd, "(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__) #define debuglog(fmt, ...) dprintf(log_file_descriptor, "(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__)
#else #else
#define debuglog(fmt, ...) printf("(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__) #define debuglog(fmt, ...) printf("(%d,%lu) %s: " fmt, sched_getcpu(), pthread_self(), __func__, ##__VA_ARGS__)
#endif #endif

@ -12,15 +12,12 @@
#include <sys/time.h> #include <sys/time.h>
#include <sys/resource.h> #include <sys/resource.h>
// TODO: I think this define is unused i32 log_file_descriptor = -1;
#define MOD_LINE_MAX 1024 u32 total_online_processors = 0;
u32 total_worker_processors = 0;
i32 logfd = -1; // Log File Descriptor u32 first_worker_processor = 0;
u32 ncores = 0; // Number of cores int worker_threads_argument[SBOX_NCORES] = { 0 }; // This is always empty, as we don't pass an argument
u32 sbox_ncores = 0; // Number of Sandboxing Cores pthread_t worker_threads[SBOX_NCORES];
u32 sbox_core_st = 0; // First Sandbox Core
pthread_t rtthd[SBOX_NCORES]; // An array of runtime threads
static unsigned long long static unsigned long long
get_time() get_time()
@ -63,68 +60,46 @@ void set_resource_limits_to_max(){
} }
} }
int void allocate_available_cores(){
main(int argc, char **argv)
{
printf("Starting Awsm\n");
#ifndef STANDALONE
// Array of arguments passed to the start_routine via pthread_create
// This is always empty, as we don't pass an argument
int rtthd_ret[SBOX_NCORES] = { 0 };
// Initialize the array of runtime threads
memset(rtthd, 0, sizeof(pthread_t) * SBOX_NCORES);
if (argc != 2) {
usage(argv[0]);
exit(-1);
}
set_resource_limits_to_max();
// Find the number of processors currently online // Find the number of processors currently online
ncores = sysconf(_SC_NPROCESSORS_ONLN); total_online_processors = sysconf(_SC_NPROCESSORS_ONLN);
// If multicore, we'll pin one core as a listener and run sandbox threads on all others // If multicore, we'll pin one core as a listener and run sandbox threads on all others
// If single core, we'll do everything on that one core if (total_online_processors > 1) {
if (ncores > 1) { first_worker_processor = 1;
u32 x = ncores - 1; // SBOX_NCORES can be used as a cap on the number of cores to use
sbox_ncores = SBOX_NCORES; // But if there are few cores that SBOX_NCORES, just use what is available
if (x < SBOX_NCORES) sbox_ncores = x; u32 max_possible_workers = total_online_processors - 1;
sbox_core_st = 1; total_worker_processors = (max_possible_workers >= SBOX_NCORES) ? SBOX_NCORES : max_possible_workers;
} else { } else {
sbox_ncores = 1; // If single core, we'll do everything on CPUID 0
first_worker_processor = 0;
total_worker_processors = 1;
}
debuglog("Number of cores %u, sandboxing cores %u (start: %u) and module reqs %u\n", total_online_processors, total_worker_processors,
first_worker_processor, MOD_REQ_CORE);
} }
debuglog("Number of cores %u, sandboxing cores %u (start: %u) and module reqs %u\n", ncores, sbox_ncores,
sbox_core_st, MOD_REQ_CORE);
// If NOSTIO is defined, close stdin, stdout, stderr, and write to logfile named awesome.log. Otherwise, log to STDOUT // If NOSTIO is defined, close stdin, stdout, stderr, and write to logfile named awesome.log. Otherwise, log to STDOUT
// NOSTIO = No Standard Input/Output? // NOSTIO = No Standard Input/Output?
void process_nostio(){
#ifdef NOSTDIO #ifdef NOSTDIO
fclose(stdout); fclose(stdout);
fclose(stderr); fclose(stderr);
fclose(stdin); fclose(stdin);
logfd = open(LOGFILE, O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU | S_IRWXG); log_file_descriptor = open(LOGFILE, O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU | S_IRWXG);
if (logfd < 0) { if (log_file_descriptor < 0) {
perror("open"); perror("open");
exit(-1); exit(-1);
} }
#else #else
logfd = 1; log_file_descriptor = 1;
#endif #endif
runtime_init();
debuglog("Parsing modules file [%s]\n", argv[1]);
if (util_parse_modules_file_json(argv[1])) {
printf("failed to parse modules file[%s]\n", argv[1]);
exit(-1);
} }
runtime_thd_init(); void start_worker_threads(){
for (int i = 0; i < total_worker_processors; i++) {
for (int i = 0; i < sbox_ncores; i++) { int ret = pthread_create(&worker_threads[i], NULL, sandbox_run_func, (void *)&worker_threads_argument[i]);
int ret = pthread_create(&rtthd[i], NULL, sandbox_run_func, (void *)&rtthd_ret[i]);
if (ret) { if (ret) {
errno = ret; errno = ret;
perror("pthread_create"); perror("pthread_create");
@ -133,14 +108,14 @@ main(int argc, char **argv)
cpu_set_t cs; cpu_set_t cs;
CPU_ZERO(&cs); CPU_ZERO(&cs);
CPU_SET(sbox_core_st + i, &cs); CPU_SET(first_worker_processor + i, &cs);
ret = pthread_setaffinity_np(rtthd[i], sizeof(cs), &cs); ret = pthread_setaffinity_np(worker_threads[i], sizeof(cs), &cs);
assert(ret == 0); assert(ret == 0);
} }
debuglog("Sandboxing environment ready!\n"); debuglog("Sandboxing environment ready!\n");
for (int i = 0; i < sbox_ncores; i++) { for (int i = 0; i < total_worker_processors; i++) {
int ret = pthread_join(rtthd[i], NULL); int ret = pthread_join(worker_threads[i], NULL);
if (ret) { if (ret) {
errno = ret; errno = ret;
perror("pthread_join"); perror("pthread_join");
@ -148,10 +123,11 @@ main(int argc, char **argv)
} }
} }
// runtime threads run forever!! so join should not return!! printf("\nWorker Threads unexpectedly returned!!\n");
printf("\nOh no..!! This can't be happening..!!\n");
exit(-1); exit(-1);
#else /* STANDALONE */ }
void execute_standalone(int argc, char **argv){
arch_context_init(&base_context, 0, 0); arch_context_init(&base_context, 0, 0);
uv_loop_init(&uvio); uv_loop_init(&uvio);
@ -179,5 +155,35 @@ main(int argc, char **argv)
// fprintf(stderr, "%llu\n", en - st); // fprintf(stderr, "%llu\n", en - st);
exit(0); exit(0);
}
int
main(int argc, char **argv)
{
printf("Starting Awsm\n");
#ifndef STANDALONE
if (argc != 2) {
usage(argv[0]);
exit(-1);
}
memset(worker_threads, 0, sizeof(pthread_t) * SBOX_NCORES);
set_resource_limits_to_max();
allocate_available_cores();
process_nostio();
runtime_init();
debuglog("Parsing modules file [%s]\n", argv[1]);
if (util_parse_modules_file_json(argv[1])) {
printf("failed to parse modules file[%s]\n", argv[1]);
exit(-1);
}
runtime_thd_init();
start_worker_threads();
#else /* STANDALONE */
execute_standalone();
#endif #endif
} }

@ -74,7 +74,7 @@ module_server_init(struct module *m)
accept_evt.data.ptr = (void *)m; accept_evt.data.ptr = (void *)m;
accept_evt.events = EPOLLIN; accept_evt.events = EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, m->srvsock, &accept_evt) < 0) assert(0); if (epoll_ctl(epoll_file_descriptor, EPOLL_CTL_ADD, m->srvsock, &accept_evt) < 0) assert(0);
#endif #endif
} }

@ -11,9 +11,9 @@
#include <uv.h> #include <uv.h>
#include <http_api.h> #include <http_api.h>
struct deque_sandbox *glb_dq; struct deque_sandbox *global_deque;
pthread_mutex_t glbq_mtx = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t global_deque_mutex = PTHREAD_MUTEX_INITIALIZER;
int epfd; int epoll_file_descriptor;
__thread static struct ps_list_head runq; // per-thread(core) run queue (doubly-linked list) __thread static struct ps_list_head runq; // per-thread(core) run queue (doubly-linked list)
__thread static struct ps_list_head endq; // per-thread(core) completion queue (doubly-linked list) __thread static struct ps_list_head endq; // per-thread(core) completion queue (doubly-linked list)
@ -200,7 +200,7 @@ void __attribute__((noinline)) __attribute__((noreturn)) sandbox_switch_preempt(
pthread_kill(pthread_self(), SIGUSR1); pthread_kill(pthread_self(), SIGUSR1);
assert(0); // should not get here.. assert(0); // should not get here..
while (1) while (true)
; ;
} }
static inline void static inline void
@ -232,7 +232,7 @@ sandbox_run_func(void *data)
uv_loop_init(&uvio); uv_loop_init(&uvio);
in_callback = 0; in_callback = 0;
while (1) { while (true) {
struct sandbox *s = sandbox_schedule_io(); struct sandbox *s = sandbox_schedule_io();
while (s) { while (s) {
sandbox_switch(s); sandbox_switch(s);
@ -266,18 +266,18 @@ void
sandbox_exit(void) sandbox_exit(void)
{ {
#ifndef STANDALONE #ifndef STANDALONE
struct sandbox *curr = sandbox_current(); struct sandbox *current_sandbox = sandbox_current();
assert(curr); assert(current_sandbox);
softint_disable(); softint_disable();
sandbox_local_stop(curr); sandbox_local_stop(current_sandbox);
curr->state = SANDBOX_RETURNED; current_sandbox->state = SANDBOX_RETURNED;
// free resources from "main function execution", as stack still in use. // free resources from "main function execution", as stack still in use.
struct sandbox *n = sandbox_schedule(0); struct sandbox *n = sandbox_schedule(0);
assert(n != curr); assert(n != current_sandbox);
softint_enable(); softint_enable();
// unmap linear memory only! // unmap linear memory only!
munmap(curr->linear_start, SBOX_MAX_MEM + PAGE_SIZE); munmap(current_sandbox->linear_start, SBOX_MAX_MEM + PAGE_SIZE);
// sandbox_local_end(curr); // sandbox_local_end(current_sandbox);
sandbox_switch(n); sandbox_switch(n);
#else #else
sandbox_switch(NULL); sandbox_switch(NULL);
@ -290,26 +290,26 @@ sandbox_exit(void)
* @return NULL * @return NULL
* *
* Used Globals: * Used Globals:
* epfd - the epoll file descriptor * epoll_file_descriptor - the epoll file descriptor
* *
*/ */
void * void *
runtime_accept_thdfn(void *d) runtime_accept_thdfn(void *d)
{ {
#ifndef STANDALONE #ifndef STANDALONE
struct epoll_event *epevts = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event)); struct epoll_event *epoll_events = (struct epoll_event *)malloc(EPOLL_MAX * sizeof(struct epoll_event));
int nreqs = 0; int total_requests = 0;
while (1) { while (true) {
int ready = epoll_wait(epfd, epevts, EPOLL_MAX, -1); int ready = epoll_wait(epoll_file_descriptor, epoll_events, EPOLL_MAX, -1);
for (int i = 0; i < ready; i++) { for (int i = 0; i < ready; i++) {
if (epevts[i].events & EPOLLERR) { if (epoll_events[i].events & EPOLLERR) {
perror("epoll_wait"); perror("epoll_wait");
assert(0); assert(0);
} }
struct sockaddr_in client; struct sockaddr_in client;
socklen_t client_len = sizeof(client); socklen_t client_len = sizeof(client);
struct module * m = (struct module *)epevts[i].data.ptr; struct module * m = (struct module *)epoll_events[i].data.ptr;
assert(m); assert(m);
int es = m->srvsock; int es = m->srvsock;
int s = accept(es, (struct sockaddr *)&client, &client_len); int s = accept(es, (struct sockaddr *)&client, &client_len);
@ -317,7 +317,8 @@ runtime_accept_thdfn(void *d)
perror("accept"); perror("accept");
assert(0); assert(0);
} }
nreqs++; total_requests++;
printf("Handling Request %d\n", total_requests);
// struct sandbox *sb = sandbox_alloc(m, m->name, s, (const struct sockaddr *)&client); // struct sandbox *sb = sandbox_alloc(m, m->name, s, (const struct sockaddr *)&client);
sbox_request_t *sb = sbox_request_alloc(m, m->name, s, (const struct sockaddr *)&client); sbox_request_t *sb = sbox_request_alloc(m, m->name, s, (const struct sockaddr *)&client);
@ -325,7 +326,7 @@ runtime_accept_thdfn(void *d)
} }
} }
free(epevts); free(epoll_events);
#endif #endif
return NULL; return NULL;
@ -337,13 +338,13 @@ runtime_accept_thdfn(void *d)
void void
runtime_init(void) runtime_init(void)
{ {
epfd = epoll_create1(0); epoll_file_descriptor = epoll_create1(0);
assert(epfd >= 0); assert(epoll_file_descriptor >= 0);
glb_dq = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox)); global_deque = (struct deque_sandbox *)malloc(sizeof(struct deque_sandbox));
assert(glb_dq); assert(global_deque);
// Note: Below is a Macro // Note: Below is a Macro
deque_init_sandbox(glb_dq, SBOX_MAX_REQS); deque_init_sandbox(global_deque, SBOX_MAX_REQS);
softint_mask(SIGUSR1); softint_mask(SIGUSR1);
softint_mask(SIGALRM); softint_mask(SIGALRM);

@ -85,7 +85,7 @@ skip:
return; return;
} }
extern pthread_t rtthd[]; extern pthread_t worker_threads[];
static inline void static inline void
softint_handler(int sig, siginfo_t *si, void *u) softint_handler(int sig, siginfo_t *si, void *u)
@ -103,11 +103,11 @@ softint_handler(int sig, siginfo_t *si, void *u)
int rt = 0; int rt = 0;
// deliver signal to all other runtime threads.. // deliver signal to all other runtime threads..
for (int i = 0; i < SBOX_NCORES; i++) { for (int i = 0; i < SBOX_NCORES; i++) {
if (pthread_self() == rtthd[i]) { if (pthread_self() == worker_threads[i]) {
rt = 1; rt = 1;
continue; continue;
} }
pthread_kill(rtthd[i], SIGALRM); pthread_kill(worker_threads[i], SIGALRM);
} }
assert(rt == 1); assert(rt == 1);
} else { } else {

@ -6,6 +6,7 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -244,7 +245,7 @@ skip:
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
while (1) { while (true) {
int s = 0; int s = 0;
printf("Test? (0 = exit)\n"); printf("Test? (0 = exit)\n");

@ -2,6 +2,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <stdio.h> #include <stdio.h>
#include <stdbool.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <netinet/in.h> #include <netinet/in.h>
@ -82,7 +83,7 @@ main(int argc, char *argv[])
return -1; return -1;
} }
while (1) { while (true) {
fseek(f, 0, SEEK_SET); fseek(f, 0, SEEK_SET);
char line[MSG_MAX] = { 0 }; char line[MSG_MAX] = { 0 };

Loading…
Cancel
Save