udp response (return value of main) and multi-threaded udp client

main
phani 5 years ago
parent d8e4cc3d99
commit 95bf6fc1a9

@ -57,6 +57,7 @@ struct sandbox {
i32 retval;
struct io_handle handles[SBOX_MAX_OPEN];
struct sockaddr client; //client requesting connection!
char *read_buf;
ssize_t read_len, read_size;
@ -69,7 +70,7 @@ struct sandbox {
DEQUE_PROTOTYPE(sandbox, struct sandbox *);
// a runtime resource, malloc on this!
struct sandbox *sandbox_alloc(struct module *mod, char *args);
struct sandbox *sandbox_alloc(struct module *mod, char *args, const struct sockaddr *addr);
// should free stack and heap resources.. also any I/O handles.
void sandbox_free(struct sandbox *sbox);
@ -138,6 +139,7 @@ void *sandbox_run_func(void *data);
struct sandbox *sandbox_schedule(void);
void sandbox_block(void);
void sandbox_wakeup(sandbox_t *sb);
void sandbox_response(struct sandbox *sb);
// should be the entry-point for each sandbox so it can do per-sandbox mem/etc init.
// should have been called with stack allocated and sandbox_current() set!

@ -132,4 +132,6 @@ typedef enum {
#define SBOX_NCORES (NCORES > 1 ? NCORES - 1 : NCORES) // number of sandboxing threads
#define SBOX_MAX_REQS (1<<19) //random!
#define SBOX_RESP_STRSZ 32
#endif /* SFRT_TYPES_H */

@ -5,8 +5,8 @@
#include <module.h>
/* perhaps move it to module.h or sandbox.h? */
struct sandbox *util_parse_sandbox_string_custom(struct module *m, char *str);
struct sandbox *util_parse_sandbox_string_json(struct module *m, char *str);
struct sandbox *util_parse_sandbox_string_custom(struct module *m, char *str, const struct sockaddr *addr);
struct sandbox *util_parse_sandbox_string_json(struct module *m, char *str, const struct sockaddr *addr);
int util_parse_modules_file_json(char *filename);
int util_parse_modules_file_custom(char *filename);

@ -107,7 +107,7 @@ main(int argc, char* argv[])
/* in current dir! */
struct module *m = module_alloc(argv[1], argv[1], 0, 0, 0, 0, 0, 0);
assert(m);
struct sandbox *s = sandbox_alloc(m, argv[1]);
struct sandbox *s = sandbox_alloc(m, argv[1], NULL);
exit(0);
#endif

@ -38,8 +38,9 @@ free_linear_memory(void *base, u32 bound, u32 max)
struct sandbox *curr = sandbox_current();
assert(base && bound);
// cannot free currently executing sandbox's memory
assert(base != curr->linear_start || base != sandbox_lmbase);
assert(curr == NULL || base != curr->linear_start || base != sandbox_lmbase);
int ret = munmap(base, MAX_LINEAR_MEM);
if (ret) perror("munmap");

@ -39,8 +39,8 @@ module_on_recv(uv_udp_t *h, ssize_t nr, const uv_buf_t *rcvbuf, const struct soc
debuglog("MC:%s, %s\n", h->data, rcvbuf->base);
// invoke a function!
struct sandbox *s = util_parse_sandbox_string_json((struct module *)(h->data), rcvbuf->base);
//struct sandbox *s = util_parse_sandbox_string_custom((struct module *)(h->data), rcvbuf->base);
struct sandbox *s = util_parse_sandbox_string_json((struct module *)(h->data), rcvbuf->base, addr);
//struct sandbox *s = util_parse_sandbox_string_custom((struct module *)(h->data), rcvbuf->base, addr);
assert(s);
done:

@ -12,9 +12,9 @@
struct deque_sandbox *glb_dq;
pthread_mutex_t glbq_mtx = PTHREAD_MUTEX_INITIALIZER;
// per-thread (per-core) run and wait queues.. (using doubly-linked-lists)
// per-thread (per-core) run and completion queue.. (using doubly-linked-lists)
__thread static struct ps_list_head runq;
__thread static struct ps_list_head waitq;
__thread static struct ps_list_head endq;
// current sandbox that is active..
__thread sandbox_t *current_sandbox = NULL;
@ -92,9 +92,24 @@ sandbox_schedule(void)
return s;
}
static inline void
sandbox_local_free(unsigned int n)
{
int i = 0;
while (i < n) {
i ++;
struct sandbox *s = ps_list_head_first_d(&endq, struct sandbox);
if (!s) break;
ps_list_rem_d(s);
sandbox_free(s);
}
}
struct sandbox *
sandbox_schedule_uvio(void)
{
sandbox_local_free(1);
if (!in_callback) sandbox_io_nowait();
assert(sandbox_current() == NULL);
@ -154,13 +169,20 @@ sandbox_local_stop(struct sandbox *s)
ps_list_rem_d(s);
}
static inline void
sandbox_local_end(struct sandbox *s)
{
assert(ps_list_singleton_d(s));
ps_list_head_append_d(&endq, s);
}
void *
sandbox_run_func(void *data)
{
arch_context_init(&base_context, 0, 0);
ps_list_head_init(&runq);
ps_list_head_init(&waitq);
ps_list_head_init(&endq);
softint_off = 0;
next_context = NULL;
#ifndef PREEMPT_DISABLE
@ -207,13 +229,13 @@ sandbox_exit(void)
assert(curr);
fprintf(stderr, "(%d,%lu) %s: %p, %s exit\n", sched_getcpu(), pthread_self(), __func__, curr, curr->mod->name);
//printf("%s: disable\n", __func__);
softint_disable();
sandbox_local_stop(curr);
curr->state = SANDBOX_RETURNED;
// TODO: free resources here? or only from main?
// free resources from "main function execution", as stack still in use.
sandbox_local_end(curr);
sandbox_response(curr);
struct sandbox *n = sandbox_schedule();
//printf("%s: enable\n", __func__);
softint_enable();
sandbox_switch(n);
#else

@ -68,7 +68,7 @@ sandbox_entry(void)
}
struct sandbox *
sandbox_alloc(struct module *mod, char *args)
sandbox_alloc(struct module *mod, char *args, const struct sockaddr *addr)
{
if (!module_is_valid(mod)) return NULL;
@ -91,6 +91,7 @@ sandbox_alloc(struct module *mod, char *args)
}
for (int i = 0; i < SBOX_MAX_OPEN; i++) sb->handles[i].fd = -1;
ps_list_init_d(sb);
if (addr) memcpy(&sb->client, addr, sizeof(struct sockaddr));
arch_context_init(&sb->ctxt, (reg_t)sandbox_entry, (reg_t)(sb->stack_start + sb->stack_size));
sandbox_run(sb);
@ -98,20 +99,40 @@ sandbox_alloc(struct module *mod, char *args)
return sb;
}
void
sandbox_response(struct sandbox *sb)
{
// send response.
#ifndef STANDALONE
int sock = -1;
char resp[SBOX_RESP_STRSZ] = { 0 };
int ret = uv_fileno((uv_handle_t *)&sb->mod->udpsrv, &sock);
assert(ret == 0);
// sends return value only for now!
sprintf(resp, "%d", sb->retval);
// using system call here because uv_udp_t is in the "module listener thread"'s loop, cannot access here. also dnot want to mess with cross-core/cross-thread uv loop states or structures.
ret = sendto(sock, resp, strlen(resp), 0, &sb->client, sizeof(struct sockaddr));
assert(ret == strlen(resp));
#endif
}
void
sandbox_free(struct sandbox *sb)
{
int ret;
// you have to context switch away to free a sandbox.
if (!sb || sb == sandbox_current()) return;
// again sandbox should be done and waiting for the parent.
// TODO: this needs to be enhanced. you may be killing a sandbox when its in any other execution states.
if (sb->state != SANDBOX_RETURNED) return;
module_release(sb->mod);
free(sb->args);
// remove stack! and also heap!
int ret = munmap(sb->stack_start, sb->stack_size);
ret = munmap(sb->stack_start, sb->stack_size);
if (ret) perror("munmap");
// depending on the memory type

@ -152,7 +152,7 @@ parse_sandbox_file_custom(char *filename)
assert(0);
}
sb = sandbox_alloc(mod, args);
sb = sandbox_alloc(mod, args, NULL);
assert(sb);
total_boxes++;
@ -168,7 +168,7 @@ next:
struct sandbox *
util_parse_sandbox_string_json(struct module *mod, char *str)
util_parse_sandbox_string_json(struct module *mod, char *str, const struct sockaddr *addr)
{
jsmn_parser sp;
jsmntok_t tk[JSON_ELE_MAX];
@ -202,7 +202,7 @@ util_parse_sandbox_string_json(struct module *mod, char *str)
*(args + ((k - 1) * MOD_ARG_MAX_SZ) + g->end - g->start) = '\0';
}
struct sandbox *sb = sandbox_alloc(mod, args);
struct sandbox *sb = sandbox_alloc(mod, args, addr);
assert(sb);
return sb;
@ -215,7 +215,7 @@ util_parse_sandbox_string_json(struct module *mod, char *str)
}
struct sandbox *
util_parse_sandbox_string_custom(struct module *mod, char *str)
util_parse_sandbox_string_custom(struct module *mod, char *str, const struct sockaddr *addr)
{
char *tok = NULL, *src = str;
@ -238,7 +238,7 @@ util_parse_sandbox_string_custom(struct module *mod, char *str)
assert(ntoks < MOD_MAX_ARGS);
}
struct sandbox *sb = sandbox_alloc(mod, args);
struct sandbox *sb = sandbox_alloc(mod, args, addr);
assert(sb);
return sb;

@ -1,6 +1,6 @@
udp: udpclient.c
@echo "Compiling udpclient"
@gcc udpclient.c -o ../../bin/udpclient
@gcc udpclient.c -o ../../bin/udpclient -lpthread
clean:
@echo "Cleaning up udpclient"

@ -10,12 +10,20 @@
#include <malloc.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <signal.h>
#include <time.h>
#define MSG_MAX 1024
#define STR_MAX 32
struct request {
char ip[32];
char port[32];
char msg[MSG_MAX];
};
static char *
remove_spaces(char *str)
@ -28,6 +36,41 @@ remove_spaces(char *str)
return str;
}
void *
send_fn(void *d)
{
struct request *r = (struct request *)d;
char resp[STR_MAX] = { 0 };
int fd = -1;
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(atoi(r->port));
sa.sin_addr.s_addr = inet_addr(r->ip);
if ((fd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
perror("Establishing socket");
return NULL;
}
if (sendto(fd, r->msg, strlen(r->msg), 0, (struct sockaddr*)&sa, sizeof(sa)) < 0 &&
errno != EINTR) {
perror("sendto");
return NULL;
}
//todo: select rcv from!
int sa_len = sizeof(sa);
if (recvfrom(fd, resp, STR_MAX, 0, (struct sockaddr *)&sa, &sa_len) < 0) {
perror("recvfrom");
}
printf("Done[%s]!\n", resp);
close(fd);
free(r);
return NULL;
}
int main(int argc, char *argv[])
{
@ -47,10 +90,8 @@ int main(int argc, char *argv[])
char line[MSG_MAX] = { 0 };
while (fgets(line, MSG_MAX, f) != NULL) {
int fd = -1;
struct sockaddr_in sa;
char *msg = NULL, *tok, *src = line;
char ip[32] = { 0 }, port[32] = { 0 };
char ip[STR_MAX] = { 0 }, port[STR_MAX] = { 0 };
src = remove_spaces(src);
if (src[0] == ';') goto next;
@ -67,32 +108,22 @@ int main(int argc, char *argv[])
printf("Exiting!\n");
exit(0);
} else if (i == 1) {
pthread_t t;
printf("Proceeding!\n");
struct request *r = (struct request *)malloc(sizeof(struct request));
strncpy(r->ip, ip, STR_MAX);
strncpy(r->port, port, STR_MAX);
strncpy(r->msg, msg, MSG_MAX);
pthread_create(&t, NULL, send_fn, r);
} else {
printf("Skipping!\n");
goto next;
}
sa.sin_family = AF_INET;
sa.sin_port = htons(atoi(port));
sa.sin_addr.s_addr = inet_addr(ip);
if ((fd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
perror("Establishing socket");
return -1;
}
if (sendto(fd, msg, strlen(msg), 0, (struct sockaddr*)&sa, sizeof(sa)) < 0 &&
errno != EINTR) {
perror("sendto");
return -1;
}
printf("Done!\n");
next:
memset(line, 0, MSG_MAX);
fflush(stdin);
fflush(stdout);
if (fd >= 0) close(fd);
}
}

Loading…
Cancel
Save