From 95bf6fc1a90dc51a60345900e5d928402d0ef3c7 Mon Sep 17 00:00:00 2001 From: phani Date: Mon, 9 Dec 2019 13:30:16 -0500 Subject: [PATCH] udp response (return value of main) and multi-threaded udp client --- runtime/include/sandbox.h | 4 +- runtime/include/types.h | 2 + runtime/include/util.h | 4 +- runtime/src/main.c | 2 +- runtime/src/memory/64bit_nix.c | 3 +- runtime/src/module.c | 4 +- runtime/src/runtime.c | 34 +++++++++++--- runtime/src/sandbox.c | 25 ++++++++++- runtime/src/util.c | 10 ++--- runtime/tools/udpclient/Makefile | 2 +- runtime/tools/udpclient/udpclient.c | 69 +++++++++++++++++++++-------- 11 files changed, 119 insertions(+), 40 deletions(-) diff --git a/runtime/include/sandbox.h b/runtime/include/sandbox.h index 51c44d4..e274494 100644 --- a/runtime/include/sandbox.h +++ b/runtime/include/sandbox.h @@ -57,6 +57,7 @@ struct sandbox { i32 retval; struct io_handle handles[SBOX_MAX_OPEN]; + struct sockaddr client; //client requesting connection! char *read_buf; ssize_t read_len, read_size; @@ -69,7 +70,7 @@ struct sandbox { DEQUE_PROTOTYPE(sandbox, struct sandbox *); // a runtime resource, malloc on this! -struct sandbox *sandbox_alloc(struct module *mod, char *args); +struct sandbox *sandbox_alloc(struct module *mod, char *args, const struct sockaddr *addr); // should free stack and heap resources.. also any I/O handles. void sandbox_free(struct sandbox *sbox); @@ -138,6 +139,7 @@ void *sandbox_run_func(void *data); struct sandbox *sandbox_schedule(void); void sandbox_block(void); void sandbox_wakeup(sandbox_t *sb); +void sandbox_response(struct sandbox *sb); // should be the entry-point for each sandbox so it can do per-sandbox mem/etc init. // should have been called with stack allocated and sandbox_current() set! diff --git a/runtime/include/types.h b/runtime/include/types.h index 94cebd7..a1bc6b8 100644 --- a/runtime/include/types.h +++ b/runtime/include/types.h @@ -132,4 +132,6 @@ typedef enum { #define SBOX_NCORES (NCORES > 1 ? NCORES - 1 : NCORES) // number of sandboxing threads #define SBOX_MAX_REQS (1<<19) //random! +#define SBOX_RESP_STRSZ 32 + #endif /* SFRT_TYPES_H */ diff --git a/runtime/include/util.h b/runtime/include/util.h index fd60138..265c109 100644 --- a/runtime/include/util.h +++ b/runtime/include/util.h @@ -5,8 +5,8 @@ #include /* perhaps move it to module.h or sandbox.h? */ -struct sandbox *util_parse_sandbox_string_custom(struct module *m, char *str); -struct sandbox *util_parse_sandbox_string_json(struct module *m, char *str); +struct sandbox *util_parse_sandbox_string_custom(struct module *m, char *str, const struct sockaddr *addr); +struct sandbox *util_parse_sandbox_string_json(struct module *m, char *str, const struct sockaddr *addr); int util_parse_modules_file_json(char *filename); int util_parse_modules_file_custom(char *filename); diff --git a/runtime/src/main.c b/runtime/src/main.c index 5c7839a..c0e7003 100644 --- a/runtime/src/main.c +++ b/runtime/src/main.c @@ -107,7 +107,7 @@ main(int argc, char* argv[]) /* in current dir! */ struct module *m = module_alloc(argv[1], argv[1], 0, 0, 0, 0, 0, 0); assert(m); - struct sandbox *s = sandbox_alloc(m, argv[1]); + struct sandbox *s = sandbox_alloc(m, argv[1], NULL); exit(0); #endif diff --git a/runtime/src/memory/64bit_nix.c b/runtime/src/memory/64bit_nix.c index ba28790..1995364 100644 --- a/runtime/src/memory/64bit_nix.c +++ b/runtime/src/memory/64bit_nix.c @@ -38,8 +38,9 @@ free_linear_memory(void *base, u32 bound, u32 max) struct sandbox *curr = sandbox_current(); assert(base && bound); + // cannot free currently executing sandbox's memory - assert(base != curr->linear_start || base != sandbox_lmbase); + assert(curr == NULL || base != curr->linear_start || base != sandbox_lmbase); int ret = munmap(base, MAX_LINEAR_MEM); if (ret) perror("munmap"); diff --git a/runtime/src/module.c b/runtime/src/module.c index 6f0c9ba..5511366 100644 --- a/runtime/src/module.c +++ b/runtime/src/module.c @@ -39,8 +39,8 @@ module_on_recv(uv_udp_t *h, ssize_t nr, const uv_buf_t *rcvbuf, const struct soc debuglog("MC:%s, %s\n", h->data, rcvbuf->base); // invoke a function! - struct sandbox *s = util_parse_sandbox_string_json((struct module *)(h->data), rcvbuf->base); - //struct sandbox *s = util_parse_sandbox_string_custom((struct module *)(h->data), rcvbuf->base); + struct sandbox *s = util_parse_sandbox_string_json((struct module *)(h->data), rcvbuf->base, addr); + //struct sandbox *s = util_parse_sandbox_string_custom((struct module *)(h->data), rcvbuf->base, addr); assert(s); done: diff --git a/runtime/src/runtime.c b/runtime/src/runtime.c index 2c34735..f231f18 100644 --- a/runtime/src/runtime.c +++ b/runtime/src/runtime.c @@ -12,9 +12,9 @@ struct deque_sandbox *glb_dq; pthread_mutex_t glbq_mtx = PTHREAD_MUTEX_INITIALIZER; -// per-thread (per-core) run and wait queues.. (using doubly-linked-lists) +// per-thread (per-core) run and completion queue.. (using doubly-linked-lists) __thread static struct ps_list_head runq; -__thread static struct ps_list_head waitq; +__thread static struct ps_list_head endq; // current sandbox that is active.. __thread sandbox_t *current_sandbox = NULL; @@ -92,9 +92,24 @@ sandbox_schedule(void) return s; } +static inline void +sandbox_local_free(unsigned int n) +{ + int i = 0; + + while (i < n) { + i ++; + struct sandbox *s = ps_list_head_first_d(&endq, struct sandbox); + if (!s) break; + ps_list_rem_d(s); + sandbox_free(s); + } +} + struct sandbox * sandbox_schedule_uvio(void) { + sandbox_local_free(1); if (!in_callback) sandbox_io_nowait(); assert(sandbox_current() == NULL); @@ -154,13 +169,20 @@ sandbox_local_stop(struct sandbox *s) ps_list_rem_d(s); } +static inline void +sandbox_local_end(struct sandbox *s) +{ + assert(ps_list_singleton_d(s)); + ps_list_head_append_d(&endq, s); +} + void * sandbox_run_func(void *data) { arch_context_init(&base_context, 0, 0); ps_list_head_init(&runq); - ps_list_head_init(&waitq); + ps_list_head_init(&endq); softint_off = 0; next_context = NULL; #ifndef PREEMPT_DISABLE @@ -207,13 +229,13 @@ sandbox_exit(void) assert(curr); fprintf(stderr, "(%d,%lu) %s: %p, %s exit\n", sched_getcpu(), pthread_self(), __func__, curr, curr->mod->name); - //printf("%s: disable\n", __func__); softint_disable(); sandbox_local_stop(curr); curr->state = SANDBOX_RETURNED; - // TODO: free resources here? or only from main? + // free resources from "main function execution", as stack still in use. + sandbox_local_end(curr); + sandbox_response(curr); struct sandbox *n = sandbox_schedule(); - //printf("%s: enable\n", __func__); softint_enable(); sandbox_switch(n); #else diff --git a/runtime/src/sandbox.c b/runtime/src/sandbox.c index c40e7f5..a208e70 100644 --- a/runtime/src/sandbox.c +++ b/runtime/src/sandbox.c @@ -68,7 +68,7 @@ sandbox_entry(void) } struct sandbox * -sandbox_alloc(struct module *mod, char *args) +sandbox_alloc(struct module *mod, char *args, const struct sockaddr *addr) { if (!module_is_valid(mod)) return NULL; @@ -91,6 +91,7 @@ sandbox_alloc(struct module *mod, char *args) } for (int i = 0; i < SBOX_MAX_OPEN; i++) sb->handles[i].fd = -1; ps_list_init_d(sb); + if (addr) memcpy(&sb->client, addr, sizeof(struct sockaddr)); arch_context_init(&sb->ctxt, (reg_t)sandbox_entry, (reg_t)(sb->stack_start + sb->stack_size)); sandbox_run(sb); @@ -98,20 +99,40 @@ sandbox_alloc(struct module *mod, char *args) return sb; } +void +sandbox_response(struct sandbox *sb) +{ + // send response. +#ifndef STANDALONE + int sock = -1; + char resp[SBOX_RESP_STRSZ] = { 0 }; + int ret = uv_fileno((uv_handle_t *)&sb->mod->udpsrv, &sock); + assert(ret == 0); + // sends return value only for now! + sprintf(resp, "%d", sb->retval); + // using system call here because uv_udp_t is in the "module listener thread"'s loop, cannot access here. also dnot want to mess with cross-core/cross-thread uv loop states or structures. + ret = sendto(sock, resp, strlen(resp), 0, &sb->client, sizeof(struct sockaddr)); + assert(ret == strlen(resp)); +#endif +} + void sandbox_free(struct sandbox *sb) { + int ret; + // you have to context switch away to free a sandbox. if (!sb || sb == sandbox_current()) return; // again sandbox should be done and waiting for the parent. // TODO: this needs to be enhanced. you may be killing a sandbox when its in any other execution states. if (sb->state != SANDBOX_RETURNED) return; + module_release(sb->mod); free(sb->args); // remove stack! and also heap! - int ret = munmap(sb->stack_start, sb->stack_size); + ret = munmap(sb->stack_start, sb->stack_size); if (ret) perror("munmap"); // depending on the memory type diff --git a/runtime/src/util.c b/runtime/src/util.c index 797272d..d2bb687 100644 --- a/runtime/src/util.c +++ b/runtime/src/util.c @@ -152,7 +152,7 @@ parse_sandbox_file_custom(char *filename) assert(0); } - sb = sandbox_alloc(mod, args); + sb = sandbox_alloc(mod, args, NULL); assert(sb); total_boxes++; @@ -168,7 +168,7 @@ next: struct sandbox * -util_parse_sandbox_string_json(struct module *mod, char *str) +util_parse_sandbox_string_json(struct module *mod, char *str, const struct sockaddr *addr) { jsmn_parser sp; jsmntok_t tk[JSON_ELE_MAX]; @@ -202,7 +202,7 @@ util_parse_sandbox_string_json(struct module *mod, char *str) *(args + ((k - 1) * MOD_ARG_MAX_SZ) + g->end - g->start) = '\0'; } - struct sandbox *sb = sandbox_alloc(mod, args); + struct sandbox *sb = sandbox_alloc(mod, args, addr); assert(sb); return sb; @@ -215,7 +215,7 @@ util_parse_sandbox_string_json(struct module *mod, char *str) } struct sandbox * -util_parse_sandbox_string_custom(struct module *mod, char *str) +util_parse_sandbox_string_custom(struct module *mod, char *str, const struct sockaddr *addr) { char *tok = NULL, *src = str; @@ -238,7 +238,7 @@ util_parse_sandbox_string_custom(struct module *mod, char *str) assert(ntoks < MOD_MAX_ARGS); } - struct sandbox *sb = sandbox_alloc(mod, args); + struct sandbox *sb = sandbox_alloc(mod, args, addr); assert(sb); return sb; diff --git a/runtime/tools/udpclient/Makefile b/runtime/tools/udpclient/Makefile index ed447f5..22cee3f 100644 --- a/runtime/tools/udpclient/Makefile +++ b/runtime/tools/udpclient/Makefile @@ -1,6 +1,6 @@ udp: udpclient.c @echo "Compiling udpclient" - @gcc udpclient.c -o ../../bin/udpclient + @gcc udpclient.c -o ../../bin/udpclient -lpthread clean: @echo "Cleaning up udpclient" diff --git a/runtime/tools/udpclient/udpclient.c b/runtime/tools/udpclient/udpclient.c index 9eb9cc8..f115053 100644 --- a/runtime/tools/udpclient/udpclient.c +++ b/runtime/tools/udpclient/udpclient.c @@ -10,12 +10,20 @@ #include #include #include +#include #include #include #include #define MSG_MAX 1024 +#define STR_MAX 32 + +struct request { + char ip[32]; + char port[32]; + char msg[MSG_MAX]; +}; static char * remove_spaces(char *str) @@ -28,6 +36,41 @@ remove_spaces(char *str) return str; } +void * +send_fn(void *d) +{ + struct request *r = (struct request *)d; + + char resp[STR_MAX] = { 0 }; + int fd = -1; + struct sockaddr_in sa; + + sa.sin_family = AF_INET; + sa.sin_port = htons(atoi(r->port)); + sa.sin_addr.s_addr = inet_addr(r->ip); + if ((fd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { + perror("Establishing socket"); + return NULL; + } + + if (sendto(fd, r->msg, strlen(r->msg), 0, (struct sockaddr*)&sa, sizeof(sa)) < 0 && + errno != EINTR) { + perror("sendto"); + return NULL; + } + + //todo: select rcv from! + int sa_len = sizeof(sa); + if (recvfrom(fd, resp, STR_MAX, 0, (struct sockaddr *)&sa, &sa_len) < 0) { + perror("recvfrom"); + } + printf("Done[%s]!\n", resp); + close(fd); + free(r); + + return NULL; +} + int main(int argc, char *argv[]) { @@ -47,10 +90,8 @@ int main(int argc, char *argv[]) char line[MSG_MAX] = { 0 }; while (fgets(line, MSG_MAX, f) != NULL) { - int fd = -1; - struct sockaddr_in sa; char *msg = NULL, *tok, *src = line; - char ip[32] = { 0 }, port[32] = { 0 }; + char ip[STR_MAX] = { 0 }, port[STR_MAX] = { 0 }; src = remove_spaces(src); if (src[0] == ';') goto next; @@ -67,32 +108,22 @@ int main(int argc, char *argv[]) printf("Exiting!\n"); exit(0); } else if (i == 1) { + pthread_t t; printf("Proceeding!\n"); + struct request *r = (struct request *)malloc(sizeof(struct request)); + strncpy(r->ip, ip, STR_MAX); + strncpy(r->port, port, STR_MAX); + strncpy(r->msg, msg, MSG_MAX); + pthread_create(&t, NULL, send_fn, r); } else { printf("Skipping!\n"); goto next; } - sa.sin_family = AF_INET; - sa.sin_port = htons(atoi(port)); - sa.sin_addr.s_addr = inet_addr(ip); - if ((fd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { - perror("Establishing socket"); - return -1; - } - - if (sendto(fd, msg, strlen(msg), 0, (struct sockaddr*)&sa, sizeof(sa)) < 0 && - errno != EINTR) { - perror("sendto"); - return -1; - } - printf("Done!\n"); next: memset(line, 0, MSG_MAX); fflush(stdin); fflush(stdout); - - if (fd >= 0) close(fd); } }