feat: remove libuv, rework pq, http close DRY up

main
Sean McBride 4 years ago
parent 8aef688553
commit a83ae29d1b

@ -0,0 +1,64 @@
#pragma once
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "panic.h"
#include "debuglog.h"
#include "http_response.h"
#include "runtime.h"
#include "worker_thread.h"
static inline void
client_socket_close(int client_socket)
{
int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, client_socket, NULL);
if (unlikely(rc < 0)) panic_err();
if (close(client_socket) < 0) debuglog("Error closing client socket - %s", strerror(errno));
}
/**
* Rejects request due to admission control or error
* @param client_socket - the client we are rejecting
* @param status_code - either 503 or 400
*/
static inline void
client_socket_send(int client_socket, int status_code)
{
const char *response;
switch (status_code) {
case 503:
response = HTTP_RESPONSE_503_SERVICE_UNAVAILABLE;
atomic_fetch_add(&runtime_total_5XX_responses, 1);
break;
case 400:
response = HTTP_RESPONSE_400_BAD_REQUEST;
break;
default:
panic("%d is not a valid status code\n", status_code);
}
int rc;
int sent = 0;
int to_send = strlen(response);
while (sent < to_send) {
rc = write(client_socket, &response[sent], to_send - sent);
if (rc < 0) {
if (errno == EAGAIN) { debuglog("Unexpectedly blocking on write of %s\n", response); }
goto send_err;
}
sent += rc;
};
done:
return;
send_err:
debuglog("Error sending to client: %s", strerror(errno));
goto done;
}

@ -5,7 +5,6 @@
void current_sandbox_close_file_descriptor(int io_handle_index);
struct sandbox * current_sandbox_get(void);
int current_sandbox_get_file_descriptor(int io_handle_index);
union uv_any_handle *current_sandbox_get_libuv_handle(int io_handle_index);
int current_sandbox_initialize_io_handle(void);
void current_sandbox_set(struct sandbox *sandbox);
int current_sandbox_set_file_descriptor(int io_handle_index, int file_descriptor);

@ -2,10 +2,6 @@
#include <http_parser.h>
#include <sys/uio.h>
/* Conditionally load libuv */
#ifdef USE_HTTP_UVIO
#include <uv.h>
#endif
#include "http.h"
@ -30,11 +26,7 @@ struct http_response {
int body_length;
char * status;
int status_length;
#ifdef USE_HTTP_UVIO
uv_buf_t bufs[HTTP_MAX_HEADER_COUNT * 2 + 3]; /* max headers, one line for status code, remaining for body! */
#else
struct iovec bufs[HTTP_MAX_HEADER_COUNT * 2 + 3];
#endif
struct iovec bufs[HTTP_MAX_HEADER_COUNT * 2 + 3];
};
/***************************************************

@ -1,89 +0,0 @@
#pragma once
#include <assert.h>
#include <sys/mman.h>
#include <signal.h>
#include <uv.h>
#include "http_request.h"
#include "runtime.h"
#include "sandbox.h"
/**
* Parses data read by the libuv stream chunk-by-chunk until the message is complete
* Then stops the stream and wakes up the sandbox
* @param stream
* @param number_read bytes read
* @param buffer unused
*
* FIXME: is there some weird edge case where a UNICODE character might be split between reads? Do we care?
* Called after libuv has read a chunk of data. Issue #100
*/
static inline void
libuv_callbacks_on_read_parse_http_request(uv_stream_t *stream, ssize_t number_read, const uv_buf_t *buffer)
{
struct sandbox *sandbox = stream->data;
/* Parse the chunks libuv has read on our behalf until we've parse to message end */
if (number_read > 0) {
// FIXME: Broken by refactor to sandbox_parse_http_request changes to return code
if (sandbox_parse_http_request(sandbox, number_read) != 0) return;
sandbox->request_response_data_length += number_read;
struct http_request *rh = &sandbox->http_request;
if (!rh->message_end) return;
}
/* When the entire message has been read, stop the stream and wakeup the sandbox */
uv_read_stop(stream);
worker_thread_wakeup_sandbox(sandbox);
}
/**
* On libuv close, executes this callback to wake the blocked sandbox back up
* @param stream
*/
static inline void
libuv_callbacks_on_close_wakeup_sakebox(uv_handle_t *stream)
{
struct sandbox *sandbox = stream->data;
worker_thread_wakeup_sandbox(sandbox);
}
/**
* On libuv shutdown, executes this callback to wake the blocked sandbox back up
* @param req shutdown request
* @param status unused in callback
*/
static inline void
libuv_callbacks_on_shutdown_wakeup_sakebox(uv_shutdown_t *req, int status)
{
struct sandbox *sandbox = req->data;
worker_thread_wakeup_sandbox(sandbox);
}
/**
* On libuv write, executes this callback to wake the blocked sandbox back up
* In case of error, shutdown the sandbox
* @param write shutdown request
* @param status status code
*/
static inline void
libuv_callbacks_on_write_wakeup_sandbox(uv_write_t *write, int status)
{
struct sandbox *sandbox = write->data;
if (status < 0) {
sandbox->client_libuv_shutdown_request.data = sandbox;
uv_shutdown(&sandbox->client_libuv_shutdown_request, (uv_stream_t *)&sandbox->client_libuv_stream,
libuv_callbacks_on_shutdown_wakeup_sakebox);
return;
}
worker_thread_wakeup_sandbox(sandbox);
}
static inline void
libuv_callbacks_on_allocate_setup_request_response_data(uv_handle_t *h, size_t suggested, uv_buf_t *buf)
{
struct sandbox *sandbox = h->data;
size_t l = (sandbox->module->max_request_or_response_size - sandbox->request_response_data_length);
buf->base = (sandbox->request_response_data + sandbox->request_response_data_length);
buf->len = l > suggested ? suggested : l;
}

@ -1,7 +1,9 @@
#pragma once
#include <string.h>
#include <uv.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include "http.h"
#include "panic.h"

@ -4,6 +4,7 @@
#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#define panic(fmt, ...) \
{ \

@ -5,8 +5,6 @@
#include "runtime.h"
#include "worker_thread.h"
#define MAX 4096
/**
* How to get the priority out of the generic element
* We assume priority is expressed as an unsigned 64-bit integer (i.e. cycles or
@ -19,31 +17,51 @@ typedef uint64_t (*priority_queue_get_priority_fn_t)(void *element);
/* We assume that priority is expressed in terms of a 64 bit unsigned integral */
struct priority_queue {
priority_queue_get_priority_fn_t get_priority_fn;
bool use_lock;
lock_t lock;
uint64_t highest_priority;
void * items[MAX];
int first_free;
priority_queue_get_priority_fn_t get_priority_fn;
size_t size;
size_t capacity;
void * items[];
};
/**
* Checks if a priority queue is empty
* @param self the priority queue to check
* @returns true if empty, else otherwise
* Peek at the priority of the highest priority task without having to take the lock
* Because this is a min-heap PQ, the highest priority is the lowest 64-bit integer
* This is used to store an absolute deadline
* @returns value of highest priority value in queue or ULONG_MAX if empty
*/
static inline bool
priority_queue_is_empty(struct priority_queue *self)
static inline uint64_t
priority_queue_peek(struct priority_queue *self)
{
return self->highest_priority == ULONG_MAX;
return self->highest_priority;
}
void priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_fn_t get_priority_fn);
int priority_queue_enqueue(struct priority_queue *self, void *value);
int priority_queue_dequeue(struct priority_queue *self, void **dequeued_element);
int priority_queue_dequeue_if_earlier(struct priority_queue *self, void **dequeued_element, uint64_t target_deadline);
int priority_queue_length(struct priority_queue *self);
struct priority_queue *
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn);
void priority_queue_free(struct priority_queue *self);
int priority_queue_length(struct priority_queue *self);
int priority_queue_length_nolock(struct priority_queue *self);
int priority_queue_enqueue(struct priority_queue *self, void *value);
int priority_queue_enqueue_nolock(struct priority_queue *self, void *value);
int priority_queue_delete(struct priority_queue *self, void *value);
int priority_queue_delete_nolock(struct priority_queue *self, void *value);
int priority_queue_dequeue(struct priority_queue *self, void **dequeued_element);
int priority_queue_dequeue_nolock(struct priority_queue *self, void **dequeued_element);
int priority_queue_dequeue_if_earlier(struct priority_queue *self, void **dequeued_element, uint64_t target_deadline);
int priority_queue_dequeue_if_earlier_nolock(struct priority_queue *self, void **dequeued_element,
uint64_t target_deadline);
uint64_t priority_queue_peek(struct priority_queue *self);
int priority_queue_delete(struct priority_queue *self, void *value);
int priority_queue_top(struct priority_queue *self, void **dequeued_element);
int priority_queue_top(struct priority_queue *self, void **dequeued_element);
int priority_queue_top_nolock(struct priority_queue *self, void **dequeued_element);
#endif /* PRIORITY_QUEUE_H */

@ -1,10 +1,10 @@
#pragma once
#include <ucontext.h>
#include <uv.h>
#include <stdbool.h>
#include "arch/context.h"
#include "client_socket.h"
#include "debuglog.h"
#include "deque.h"
#include "http_request.h"
@ -23,8 +23,7 @@
********************/
struct sandbox_io_handle {
int file_descriptor;
union uv_any_handle libuv_handle;
int file_descriptor;
};
typedef enum
@ -97,8 +96,6 @@ struct sandbox {
struct sandbox_io_handle io_handles[SANDBOX_MAX_IO_HANDLE_COUNT];
struct sockaddr client_address; /* client requesting connection! */
int client_socket_descriptor;
uv_tcp_t client_libuv_stream;
uv_shutdown_t client_libuv_shutdown_request;
bool is_repeat_header;
http_parser http_parser;
@ -227,7 +224,6 @@ sandbox_initialize_io_handle(struct sandbox *sandbox)
}
if (io_handle_index == SANDBOX_MAX_IO_HANDLE_COUNT) return -1;
sandbox->io_handles[io_handle_index].file_descriptor = SANDBOX_FILE_DESCRIPTOR_PREOPEN_MAGIC;
memset(&sandbox->io_handles[io_handle_index].libuv_handle, 0, sizeof(union uv_any_handle));
return io_handle_index;
}
@ -298,20 +294,6 @@ sandbox_close_file_descriptor(struct sandbox *sandbox, int io_handle_index)
sandbox->io_handles[io_handle_index].file_descriptor = -1;
}
/**
* Get the Libuv handle located at idx of the sandbox ith io_handle
* @param sandbox
* @param io_handle_index index of the handle containing libuv_handle???
* @returns any libuv handle or a NULL pointer in case of error
*/
static inline union uv_any_handle *
sandbox_get_libuv_handle(struct sandbox *sandbox, int io_handle_index)
{
if (!sandbox) return NULL;
if (io_handle_index >= SANDBOX_MAX_IO_HANDLE_COUNT || io_handle_index < 0) return NULL;
return &sandbox->io_handles[io_handle_index].libuv_handle;
}
/**
* Prints key performance metrics for a sandbox to STDOUT
* @param sandbox
@ -339,17 +321,7 @@ sandbox_close_http(struct sandbox *sandbox)
{
assert(sandbox != NULL);
#ifdef USE_HTTP_UVIO
uv_close((uv_handle_t *)&sandbox->client_libuv_stream, libuv_callbacks_on_close_wakeup_sakebox);
worker_thread_process_io();
#else
int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_DEL, sandbox->client_socket_descriptor, NULL);
if (unlikely(rc < 0)) panic_err();
if (close(sandbox->client_socket_descriptor) < 0) {
panic("Error closing client socket - %s", strerror(errno));
}
#endif
client_socket_close(sandbox->client_socket_descriptor);
}

@ -1,6 +1,7 @@
#pragma once
#include <stdbool.h>
#include <sys/socket.h>
#include "debuglog.h"
#include "deque.h"

@ -1,7 +1,5 @@
#pragma once
#include <uv.h>
#include "runtime.h"
#if NCORES == 1
@ -10,10 +8,9 @@
#define WORKER_THREAD_CORE_COUNT (NCORES > 1 ? NCORES - 1 : NCORES)
extern __thread uint64_t worker_thread_lock_duration;
extern __thread uint64_t worker_thread_start_timestamp;
extern __thread uv_loop_t worker_thread_uvio_handle;
extern __thread int worker_thread_epoll_file_descriptor;
extern __thread uint64_t worker_thread_lock_duration;
extern __thread uint64_t worker_thread_start_timestamp;
extern __thread int worker_thread_epoll_file_descriptor;
void *worker_thread_main(void *return_code);
@ -56,12 +53,3 @@ worker_thread_get_memory_string(uint32_t offset, uint32_t max_length)
}
return NULL;
}
/**
* Get global libuv handle
*/
static inline uv_loop_t *
worker_thread_get_libuv_handle(void)
{
return &worker_thread_uvio_handle;
}

@ -91,15 +91,3 @@ current_sandbox_close_file_descriptor(int io_handle_index)
struct sandbox *sandbox = current_sandbox_get();
sandbox_close_file_descriptor(sandbox, io_handle_index);
}
/**
* Get the Libuv handle located at idx of the sandbox ith io_handle
* @param io_handle_index index of the handle containing libuv_handle???
* @returns any libuv handle
*/
union uv_any_handle *
current_sandbox_get_libuv_handle(int io_handle_index)
{
struct sandbox *sandbox = current_sandbox_get();
return sandbox_get_libuv_handle(sandbox, io_handle_index);
}

@ -5,7 +5,7 @@
#include "priority_queue.h"
#include "runtime.h"
static struct priority_queue global_request_scheduler_minheap;
static struct priority_queue *global_request_scheduler_minheap;
/**
* Pushes a sandbox request to the global deque
@ -15,12 +15,11 @@ static struct priority_queue global_request_scheduler_minheap;
static struct sandbox_request *
global_request_scheduler_minheap_add(void *sandbox_request)
{
/* This function is called by both the listener core and workers */
#ifndef NDEBUG
if (runtime_is_worker()) assert(!software_interrupt_is_enabled());
#endif
assert(sandbox_request);
assert(global_request_scheduler_minheap);
if (unlikely(runtime_is_worker())) panic("%s is only callable by the listener thread\n", __func__);
int return_code = priority_queue_enqueue(&global_request_scheduler_minheap, sandbox_request);
int return_code = priority_queue_enqueue(global_request_scheduler_minheap, sandbox_request);
/* TODO: Propagate -1 to caller. Issue #91 */
if (return_code == -ENOSPC) panic("Request Queue is full\n");
return sandbox_request;
@ -34,7 +33,7 @@ int
global_request_scheduler_minheap_remove(struct sandbox_request **removed_sandbox_request)
{
assert(!software_interrupt_is_enabled());
return priority_queue_dequeue(&global_request_scheduler_minheap, (void **)removed_sandbox_request);
return priority_queue_dequeue(global_request_scheduler_minheap, (void **)removed_sandbox_request);
}
/**
@ -47,7 +46,7 @@ global_request_scheduler_minheap_remove_if_earlier(struct sandbox_request **remo
uint64_t target_deadline)
{
assert(!software_interrupt_is_enabled());
return priority_queue_dequeue_if_earlier(&global_request_scheduler_minheap, (void **)removed_sandbox_request,
return priority_queue_dequeue_if_earlier(global_request_scheduler_minheap, (void **)removed_sandbox_request,
target_deadline);
}
@ -60,7 +59,7 @@ global_request_scheduler_minheap_remove_if_earlier(struct sandbox_request **remo
static uint64_t
global_request_scheduler_minheap_peek(void)
{
return priority_queue_peek(&global_request_scheduler_minheap);
return priority_queue_peek(global_request_scheduler_minheap);
}
uint64_t
@ -77,7 +76,7 @@ sandbox_request_get_priority_fn(void *element)
void
global_request_scheduler_minheap_initialize()
{
priority_queue_initialize(&global_request_scheduler_minheap, sandbox_request_get_priority_fn);
global_request_scheduler_minheap = priority_queue_initialize(1000, true, sandbox_request_get_priority_fn);
struct global_request_scheduler_config config = {
.add_fn = global_request_scheduler_minheap_add,
@ -88,3 +87,9 @@ global_request_scheduler_minheap_initialize()
global_request_scheduler_initialize(&config);
}
void
global_request_scheduler_minheap_free()
{
priority_queue_free(global_request_scheduler_minheap);
}

@ -1,5 +1,3 @@
#include <uv.h>
#include "http.h"
#include "http_request.h"
#include "http_response.h"

@ -1,7 +1,4 @@
#include <assert.h>
#ifdef USE_HTTP_UVIO
#include <uv.h>
#endif
#include "http_response.h"
@ -19,25 +16,6 @@ http_response_encode_as_vector(struct http_response *http_response)
{
int buffer_count = 0;
#ifdef USE_HTTP_UVIO
http_response->bufs[buffer_count] = uv_buf_init(http_response->status, http_response->status_length);
buffer_count++;
for (int i = 0; i < http_response->header_count; i++) {
http_response->bufs[buffer_count] = uv_buf_init(http_response->headers[i].header,
http_response->headers[i].length);
buffer_count++;
}
if (http_response->body) {
http_response->bufs[buffer_count] = uv_buf_init(http_response->body, http_response->body_length);
buffer_count++;
http_response->bufs[buffer_count] = uv_buf_init(http_response->status + http_response->status_length
- 2,
2); /* for crlf */
buffer_count++;
}
#else
http_response->bufs[buffer_count].iov_base = http_response->status;
http_response->bufs[buffer_count].iov_len = http_response->status_length;
buffer_count++;
@ -57,7 +35,6 @@ http_response_encode_as_vector(struct http_response *http_response)
http_response->bufs[buffer_count].iov_len = 2;
buffer_count++;
}
#endif
return buffer_count;
}

@ -1,14 +1,14 @@
#ifndef USE_HTTP_UVIO
/*
* This code originally came from the aWsm compiler
* It has since been updated
* https://github.com/gwsystems/aWsm/blob/master/runtime/libc/libc_backing.c
*/
#include <current_sandbox.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include "current_sandbox.h"
// What should we tell the child program its UID and GID are?
#define UID 0xFF
@ -786,5 +786,3 @@ inner_syscall_handler(int32_t n, int32_t a, int32_t b, int32_t c, int32_t d, int
return 0;
}
#endif

File diff suppressed because it is too large Load Diff

@ -1,3 +1,4 @@
#include "client_socket.h"
#include "debuglog.h"
#include "local_runqueue_list.h"
#include "local_runqueue.h"
@ -51,18 +52,17 @@ local_runqueue_list_get_next()
if (global_request_scheduler_remove(&sandbox_request) < 0) goto err;
struct sandbox *sandbox = sandbox_allocate(sandbox_request);
if (!sandbox) goto sandbox_allocate_err;
if (!sandbox) goto err;
sandbox->state = SANDBOX_RUNNABLE;
local_runqueue_add(sandbox);
done:
return sandbox;
sandbox_allocate_err:
debuglog("local_runqueue_list_get_next failed to allocate sandbox, returning request to global request "
"scheduler\n");
global_request_scheduler_add(sandbox_request);
err:
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor);
free(sandbox_request);
sandbox = NULL;
goto done;
}

@ -1,5 +1,6 @@
#include <stdint.h>
#include "client_socket.h"
#include "current_sandbox.h"
#include "debuglog.h"
#include "global_request_scheduler.h"
@ -9,7 +10,7 @@
#include "priority_queue.h"
#include "software_interrupt.h"
__thread static struct priority_queue local_runqueue_minheap;
__thread static struct priority_queue *local_runqueue_minheap;
/**
* Checks if the run queue is empty
@ -18,7 +19,7 @@ __thread static struct priority_queue local_runqueue_minheap;
bool
local_runqueue_minheap_is_empty()
{
return priority_queue_is_empty(&local_runqueue_minheap);
return priority_queue_length_nolock(local_runqueue_minheap) == 0;
}
/**
@ -31,7 +32,7 @@ local_runqueue_minheap_add(struct sandbox *sandbox)
{
assert(!software_interrupt_is_enabled());
int return_code = priority_queue_enqueue(&local_runqueue_minheap, sandbox);
int return_code = priority_queue_enqueue_nolock(local_runqueue_minheap, sandbox);
/* TODO: propagate RC to caller. Issue #92 */
if (return_code == -ENOSPC) panic("Thread Runqueue is full!\n");
}
@ -45,7 +46,7 @@ static int
local_runqueue_minheap_remove(struct sandbox **to_remove)
{
assert(!software_interrupt_is_enabled());
return priority_queue_dequeue(&local_runqueue_minheap, (void **)to_remove);
return priority_queue_dequeue_nolock(local_runqueue_minheap, (void **)to_remove);
}
/**
@ -58,7 +59,7 @@ local_runqueue_minheap_delete(struct sandbox *sandbox)
assert(!software_interrupt_is_enabled());
assert(sandbox != NULL);
int rc = priority_queue_delete(&local_runqueue_minheap, sandbox);
int rc = priority_queue_delete_nolock(local_runqueue_minheap, sandbox);
if (rc == -1) panic("Tried to delete sandbox %lu from runqueue, but was not present\n", sandbox->id);
}
@ -77,9 +78,9 @@ local_runqueue_minheap_get_next()
struct sandbox * sandbox = NULL;
struct sandbox_request *sandbox_request = NULL;
int sandbox_rc = priority_queue_top(&local_runqueue_minheap, (void **)&sandbox);
int sandbox_rc = priority_queue_top_nolock(local_runqueue_minheap, (void **)&sandbox);
if (sandbox_rc == -ENOENT && global_request_scheduler_peek() < ULONG_MAX) {
while (sandbox_rc == -ENOENT && global_request_scheduler_peek() < ULONG_MAX && sandbox == NULL) {
/* local runqueue empty, try to pull a sandbox request */
if (global_request_scheduler_remove(&sandbox_request) < 0) {
/* Assumption: Sandbox request should not be set in case of an error */
@ -87,9 +88,14 @@ local_runqueue_minheap_get_next()
goto done;
}
/* Try to allocate a sandbox, returning the request on failure */
/* Try to allocate a sandbox. Try again on failure */
sandbox = sandbox_allocate(sandbox_request);
if (!sandbox) goto sandbox_allocate_err;
if (!sandbox) {
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor);
free(sandbox_request);
continue;
};
assert(sandbox->state == SANDBOX_INITIALIZED);
sandbox_set_as_runnable(sandbox, SANDBOX_INITIALIZED);
@ -97,10 +103,6 @@ local_runqueue_minheap_get_next()
done:
return sandbox;
sandbox_allocate_err:
debuglog("local_runqueue_minheap_get_next failed to allocate sandbox. Adding request back to global "
"request scheduler\n");
global_request_scheduler_add(sandbox_request);
err:
sandbox = NULL;
goto done;
@ -132,9 +134,8 @@ local_runqueue_minheap_preempt(ucontext_t *user_context)
assert(local_runqueue_minheap_is_empty() == false);
bool should_enable_software_interrupt = true;
uint64_t local_deadline = priority_queue_peek(&local_runqueue_minheap);
uint64_t local_deadline = priority_queue_peek(local_runqueue_minheap);
uint64_t global_deadline = global_request_scheduler_peek();
/* If we're able to get a sandbox request with a tighter deadline, preempt the current context and run it */
struct sandbox_request *sandbox_request = NULL;
if (global_deadline < local_deadline) {
@ -191,10 +192,9 @@ done:
if (should_enable_software_interrupt) software_interrupt_enable();
return;
err_sandbox_allocate:
assert(sandbox_request);
debuglog("local_runqueue_minheap_preempt failed to allocate sandbox, returning request to global request "
"scheduler\n");
global_request_scheduler_add(sandbox_request);
client_socket_send(sandbox_request->socket_descriptor, 503);
client_socket_close(sandbox_request->socket_descriptor);
debuglog("local_runqueue_minheap_preempt failed to allocate sandbox\n");
err:
goto done;
}
@ -215,7 +215,7 @@ local_runqueue_minheap_initialize()
{
/* Initialize local state */
software_interrupt_disable();
priority_queue_initialize(&local_runqueue_minheap, sandbox_get_priority);
local_runqueue_minheap = priority_queue_initialize(256, false, sandbox_get_priority);
software_interrupt_enable();
/* Register Function Pointers for Abstract Scheduling API */

@ -3,8 +3,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <uv.h>
#include "debuglog.h"
#include "http.h"

@ -23,14 +23,13 @@ priority_queue_append(struct priority_queue *self, void *new_item)
{
assert(self != NULL);
assert(new_item != NULL);
assert(LOCK_IS_LOCKED(&self->lock));
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
int rc;
if (self->first_free >= MAX) goto err_enospc;
self->items[self->first_free++] = new_item;
/* Add one and prefix because we use 1-based indices in our backing array */
if (self->size + 1 == self->capacity) goto err_enospc;
self->items[++self->size] = new_item;
rc = 0;
done:
@ -40,6 +39,21 @@ err_enospc:
goto done;
}
/**
* Checks if a priority queue is empty
* @param self the priority queue to check
* @returns true if empty, else otherwise
*/
static inline bool
priority_queue_is_empty(struct priority_queue *self)
{
assert(self != NULL);
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
assert(!runtime_is_worker() || !software_interrupt_is_enabled());
return self->size == 0;
}
/**
* Shifts an appended value upwards to restore heap structure property
* @param self the priority queue
@ -49,9 +63,15 @@ priority_queue_percolate_up(struct priority_queue *self)
{
assert(self != NULL);
assert(self->get_priority_fn != NULL);
assert(LOCK_IS_LOCKED(&self->lock));
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
for (int i = self->first_free - 1;
/* If there's only one element, set memoized lookup and early out */
if (self->size == 1) {
self->highest_priority = self->get_priority_fn(self->items[1]);
return;
}
for (int i = self->size;
i / 2 != 0 && self->get_priority_fn(self->items[i]) < self->get_priority_fn(self->items[i / 2]); i /= 2) {
assert(self->get_priority_fn(self->items[i]) != ULONG_MAX);
void *temp = self->items[i / 2];
@ -72,9 +92,9 @@ static inline int
priority_queue_find_smallest_child(struct priority_queue *self, int parent_index)
{
assert(self != NULL);
assert(parent_index >= 1 && parent_index < self->first_free);
assert(parent_index >= 1 && parent_index <= self->size);
assert(self->get_priority_fn != NULL);
assert(LOCK_IS_LOCKED(&self->lock));
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
int left_child_index = 2 * parent_index;
int right_child_index = 2 * parent_index + 1;
@ -83,7 +103,7 @@ priority_queue_find_smallest_child(struct priority_queue *self, int parent_index
int smallest_child_idx;
/* If we don't have a right child or the left child is smaller, return it */
if (right_child_index == self->first_free) {
if (right_child_index > self->size) {
smallest_child_idx = left_child_index;
} else if (self->get_priority_fn(self->items[left_child_index])
< self->get_priority_fn(self->items[right_child_index])) {
@ -106,12 +126,12 @@ priority_queue_percolate_down(struct priority_queue *self, int parent_index)
{
assert(self != NULL);
assert(self->get_priority_fn != NULL);
assert(LOCK_IS_LOCKED(&self->lock));
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
assert(runtime_is_worker());
assert(!software_interrupt_is_enabled());
int left_child_index = 2 * parent_index;
while (left_child_index >= 2 && left_child_index < self->first_free) {
while (left_child_index >= 2 && left_child_index <= self->size) {
int smallest_child_index = priority_queue_find_smallest_child(self, parent_index);
/* Once the parent is equal to or less than its smallest child, break; */
if (self->get_priority_fn(self->items[parent_index])
@ -125,21 +145,15 @@ priority_queue_percolate_down(struct priority_queue *self, int parent_index)
parent_index = smallest_child_index;
left_child_index = 2 * parent_index;
}
}
/**
* Checks if a priority queue is empty
* @param self the priority queue to check
* @returns true if empty, else otherwise
*/
static inline bool
priority_queue_is_empty_locked(struct priority_queue *self)
{
assert(self != NULL);
assert(LOCK_IS_LOCKED(&self->lock));
assert(!runtime_is_worker() || !software_interrupt_is_enabled());
return self->first_free == 1;
/* Update memoized value if we touched the head */
if (parent_index == 1) {
if (!priority_queue_is_empty(self)) {
self->highest_priority = self->get_priority_fn(self->items[1]);
} else {
self->highest_priority = ULONG_MAX;
}
}
}
/*********************
@ -148,24 +162,44 @@ priority_queue_is_empty_locked(struct priority_queue *self)
/**
* Initialized the Priority Queue Data structure
* @param self the priority_queue to initialize
* @param capacity the number of elements to store in the data structure
* @param use_lock indicates that we want a concurrent data structure
* @param get_priority_fn pointer to a function that returns the priority of an element
* @return priority queue
*/
void
priority_queue_initialize(struct priority_queue *self, priority_queue_get_priority_fn_t get_priority_fn)
struct priority_queue *
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn)
{
assert(self != NULL);
assert(get_priority_fn != NULL);
assert(!runtime_is_worker() || !software_interrupt_is_enabled());
memset(self->items, 0, sizeof(void *) * MAX);
/* Add one to capacity because this data structure ignores the element at 0 */
struct priority_queue *self = calloc(sizeof(struct priority_queue) + sizeof(void *) * (capacity + 1), 1);
LOCK_INIT(&self->lock);
self->first_free = 1;
self->get_priority_fn = get_priority_fn;
/* We're assuming a min-heap implementation, so set to larget possible value */
self->highest_priority = ULONG_MAX;
self->size = 0;
self->capacity = capacity;
self->get_priority_fn = get_priority_fn;
self->use_lock = use_lock;
if (use_lock) LOCK_INIT(&self->lock);
return self;
}
/**
* Free the Priority Queue Data structure
* @param self the priority_queue to initialize
*/
void
priority_queue_free(struct priority_queue *self)
{
assert(self != NULL);
assert(!runtime_is_worker() || !software_interrupt_is_enabled());
free(self);
}
/**
@ -173,16 +207,27 @@ priority_queue_initialize(struct priority_queue *self, priority_queue_get_priori
* @returns the number of elements in the priority queue
*/
int
priority_queue_length(struct priority_queue *self)
priority_queue_length_nolock(struct priority_queue *self)
{
assert(self != NULL);
assert(runtime_is_worker());
assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
return self->size;
}
/**
* @param self the priority_queue
* @returns the number of elements in the priority queue
*/
int
priority_queue_length(struct priority_queue *self)
{
LOCK_LOCK(&self->lock);
int length = self->first_free - 1;
int size = priority_queue_length_nolock(self);
LOCK_UNLOCK(&self->lock);
return length;
return size;
}
/**
@ -191,33 +236,42 @@ priority_queue_length(struct priority_queue *self)
* @returns 0 on success. -ENOSPC on full.
*/
int
priority_queue_enqueue(struct priority_queue *self, void *value)
priority_queue_enqueue_nolock(struct priority_queue *self, void *value)
{
assert(self != NULL);
assert(value != NULL);
assert(!runtime_is_worker() || !software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
int rc;
LOCK_LOCK(&self->lock);
if (priority_queue_append(self, value) == -ENOSPC) goto err_enospc;
/* If this is the first element we add, update the highest priority */
if (self->first_free == 2) {
self->highest_priority = self->get_priority_fn(value);
} else {
priority_queue_percolate_up(self);
}
priority_queue_percolate_up(self);
rc = 0;
release_lock:
LOCK_UNLOCK(&self->lock);
done:
return rc;
err_enospc:
rc = -ENOSPC;
goto release_lock;
goto done;
}
/**
* @param self - the priority queue we want to add to
* @param value - the value we want to add
* @returns 0 on success. -ENOSPC on full.
*/
int
priority_queue_enqueue(struct priority_queue *self, void *value)
{
int rc;
LOCK_LOCK(&self->lock);
rc = priority_queue_enqueue_nolock(self, value);
LOCK_UNLOCK(&self->lock);
return rc;
}
/**
@ -226,29 +280,41 @@ err_enospc:
* @returns 0 on success. -1 on not found
*/
int
priority_queue_delete(struct priority_queue *self, void *value)
priority_queue_delete_nolock(struct priority_queue *self, void *value)
{
assert(self != NULL);
assert(value != NULL);
assert(runtime_is_worker());
assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
LOCK_LOCK(&self->lock);
bool did_delete = false;
for (int i = 1; i < self->first_free; i++) {
for (int i = 1; i <= self->size; i++) {
if (self->items[i] == value) {
self->items[i] = self->items[--self->first_free];
self->items[self->first_free] = NULL;
self->items[i] = self->items[self->size];
self->items[self->size--] = NULL;
priority_queue_percolate_down(self, i);
did_delete = true;
return 0;
}
}
return -1;
}
/**
* @param self - the priority queue we want to delete from
* @param value - the value we want to delete
* @returns 0 on success. -1 on not found
*/
int
priority_queue_delete(struct priority_queue *self, void *value)
{
int rc;
LOCK_LOCK(&self->lock);
rc = priority_queue_delete_nolock(self, value);
LOCK_UNLOCK(&self->lock);
if (!did_delete) return -1;
return 0;
return rc;
}
/**
@ -262,6 +328,17 @@ priority_queue_dequeue(struct priority_queue *self, void **dequeued_element)
return priority_queue_dequeue_if_earlier(self, dequeued_element, UINT64_MAX);
}
/**
* @param self - the priority queue we want to add to
* @param dequeued_element a pointer to set to the dequeued element
* @returns RC 0 if successfully set dequeued_element, -ENOENT if empty
*/
int
priority_queue_dequeue_nolock(struct priority_queue *self, void **dequeued_element)
{
return priority_queue_dequeue_if_earlier_nolock(self, dequeued_element, UINT64_MAX);
}
/**
* @param self - the priority queue we want to add to
* @param dequeued_element a pointer to set to the dequeued element
@ -269,42 +346,57 @@ priority_queue_dequeue(struct priority_queue *self, void **dequeued_element)
* @returns RC 0 if successfully set dequeued_element, -ENOENT if empty or if none meet target_deadline
*/
int
priority_queue_dequeue_if_earlier(struct priority_queue *self, void **dequeued_element, uint64_t target_deadline)
priority_queue_dequeue_if_earlier_nolock(struct priority_queue *self, void **dequeued_element, uint64_t target_deadline)
{
assert(self != NULL);
assert(dequeued_element != NULL);
assert(self->get_priority_fn != NULL);
assert(runtime_is_worker());
assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
int return_code;
LOCK_LOCK(&self->lock);
/* If the dequeue is not higher priority (earlier timestamp) than targed_deadline, return immediately */
if (priority_queue_is_empty_locked(self) || self->highest_priority >= target_deadline) goto err_enoent;
if (priority_queue_is_empty(self) || self->highest_priority >= target_deadline) goto err_enoent;
*dequeued_element = self->items[1];
self->items[1] = self->items[self->size];
self->items[self->size--] = NULL;
*dequeued_element = self->items[1];
self->items[1] = self->items[--self->first_free];
self->items[self->first_free] = NULL;
/* Because of 1-based indices, first_free is 2 when there is only one element */
if (self->first_free > 2) priority_queue_percolate_down(self, 1);
if (self->size > 1) priority_queue_percolate_down(self, 1);
/* Update the highest priority */
if (!priority_queue_is_empty_locked(self)) {
if (!priority_queue_is_empty(self)) {
self->highest_priority = self->get_priority_fn(self->items[1]);
} else {
self->highest_priority = ULONG_MAX;
}
return_code = 0;
release_lock:
LOCK_UNLOCK(&self->lock);
done:
return return_code;
err_enoent:
return_code = -ENOENT;
goto release_lock;
goto done;
}
/**
* @param self - the priority queue we want to add to
* @param dequeued_element a pointer to set to the dequeued element
* @param target_deadline the deadline that the request must be earlier than in order to dequeue
* @returns RC 0 if successfully set dequeued_element, -ENOENT if empty or if none meet target_deadline
*/
int
priority_queue_dequeue_if_earlier(struct priority_queue *self, void **dequeued_element, uint64_t target_deadline)
{
int return_code;
LOCK_LOCK(&self->lock);
return_code = priority_queue_dequeue_if_earlier_nolock(self, dequeued_element, target_deadline);
LOCK_UNLOCK(&self->lock);
return return_code;
}
/**
@ -314,40 +406,43 @@ err_enoent:
* @returns RC 0 if successfully set dequeued_element, -ENOENT if empty
*/
int
priority_queue_top(struct priority_queue *self, void **dequeued_element)
priority_queue_top_nolock(struct priority_queue *self, void **dequeued_element)
{
assert(self != NULL);
assert(dequeued_element != NULL);
assert(self->get_priority_fn != NULL);
assert(runtime_is_worker());
assert(!software_interrupt_is_enabled());
assert(!self->use_lock || LOCK_IS_LOCKED(&self->lock));
int return_code;
LOCK_LOCK(&self->lock);
if (priority_queue_is_empty_locked(self)) goto err_enoent;
if (priority_queue_is_empty(self)) goto err_enoent;
*dequeued_element = self->items[1];
return_code = 0;
release_lock:
LOCK_UNLOCK(&self->lock);
done:
return return_code;
err_enoent:
return_code = -ENOENT;
goto release_lock;
goto done;
}
/**
* Peek at the priority of the highest priority task without having to take the lock
* Because this is a min-heap PQ, the highest priority is the lowest 64-bit integer
* This is used to store an absolute deadline
* @returns value of highest priority value in queue or ULONG_MAX if empty
* Returns the top of the priority queue without removing it
* @param self - the priority queue we want to add to
* @param dequeued_element a pointer to set to the top element
* @returns RC 0 if successfully set dequeued_element, -ENOENT if empty
*/
uint64_t
priority_queue_peek(struct priority_queue *self)
int
priority_queue_top(struct priority_queue *self, void **dequeued_element)
{
return self->highest_priority;
int return_code;
LOCK_LOCK(&self->lock);
return_code = priority_queue_top_nolock(self, dequeued_element);
LOCK_UNLOCK(&self->lock);
return return_code;
}

@ -1,9 +1,9 @@
#include <signal.h>
#include <sched.h>
#include <sys/mman.h>
#include <uv.h>
#include "arch/context.h"
#include "client_socket.h"
#include "debuglog.h"
#include "global_request_scheduler_deque.h"
#include "global_request_scheduler_minheap.h"
@ -151,39 +151,6 @@ runtime_initialize(void)
* Listener Thread Logic *
************************/
/**
* Rejects Requests as determined by admissions control
* @param client_socket - the client we are rejecting
*/
static inline void
listener_thread_reject(int client_socket)
{
assert(client_socket >= 0);
int rc;
int sent = 0;
int to_send = strlen(HTTP_RESPONSE_503_SERVICE_UNAVAILABLE);
while (sent < to_send) {
rc = write(client_socket, &HTTP_RESPONSE_503_SERVICE_UNAVAILABLE[sent], to_send - sent);
if (rc < 0) {
if (errno == EAGAIN) continue;
goto send_504_err;
}
sent += rc;
};
atomic_fetch_add(&runtime_total_5XX_responses, 1);
close:
if (close(client_socket) < 0) panic("Error closing client socket - %s", strerror(errno));
return;
send_504_err:
debuglog("Error sending 504: %s", strerror(errno));
goto close;
}
/**
* @brief Execution Loop of the listener core, io_handles HTTP requests, allocates sandbox request objects, and pushes
* the sandbox object to the global dequeue
@ -285,7 +252,9 @@ listener_thread_main(void *dummy)
/ module->relative_deadline;
if (runtime_admitted + admissions_estimate >= runtime_admissions_capacity) {
listener_thread_reject(client_socket);
client_socket_send(client_socket, 503);
if (close(client_socket) < 0)
debuglog("Error closing client socket - %s", strerror(errno));
continue;
}

@ -2,12 +2,10 @@
#include <pthread.h>
#include <signal.h>
#include <sys/mman.h>
#include <uv.h>
#include "current_sandbox.h"
#include "debuglog.h"
#include "http_parser_settings.h"
#include "libuv_callbacks.h"
#include "local_completion_queue.h"
#include "local_runqueue.h"
#include "panic.h"
@ -60,8 +58,6 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
int rc = 0;
#ifndef USE_HTTP_UVIO
while (!sandbox->http_request.message_end) {
/* Read from the Socket */
@ -106,13 +102,6 @@ sandbox_receive_and_parse_client_request(struct sandbox *sandbox)
sandbox->request_length = sandbox->request_response_data_length;
#else
rc = uv_read_start((uv_stream_t *)&sandbox->client_libuv_stream,
libuv_callbacks_on_allocate_setup_request_response_data,
libuv_callbacks_on_read_parse_http_request);
worker_thread_process_io();
#endif
rc = 0;
done:
return rc;
@ -204,11 +193,10 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox)
sandbox->total_time = end_time - sandbox->request_arrival_timestamp;
uint64_t total_time_us = sandbox->total_time / runtime_processor_speed_MHz;
#ifndef USE_HTTP_UVIO
int rc;
int sent = 0;
while (sent < response_cursor) {
rc = write(sandbox->client_socket_descriptor, sandbox->request_response_data + sent,
rc = write(sandbox->client_socket_descriptor, &sandbox->request_response_data[sent],
response_cursor - sent);
if (rc < 0) {
if (errno == EAGAIN)
@ -221,15 +209,6 @@ sandbox_build_and_send_client_response(struct sandbox *sandbox)
sent += rc;
}
#else
uv_write_t req = {
.data = sandbox,
};
uv_buf_t bufv = uv_buf_init(sandbox->request_response_data, response_cursor);
int r = uv_write(&req, (uv_stream_t *)&sandbox->client_libuv_stream, &bufv, 1,
libuv_callbacks_on_write_wakeup_sandbox);
worker_thread_process_io();
#endif
return 0;
}
@ -243,19 +222,6 @@ sandbox_open_http(struct sandbox *sandbox)
/* Set the sandbox as the data the http-parser has access to */
sandbox->http_parser.data = sandbox;
#ifdef USE_HTTP_UVIO
/* Initialize libuv TCP stream */
int r = uv_tcp_init(worker_thread_get_libuv_handle(), (uv_tcp_t *)&sandbox->client_libuv_stream);
assert(r == 0);
/* Set the current sandbox as the data the libuv callbacks have access to */
sandbox->client_libuv_stream.data = sandbox;
/* Open the libuv TCP stream */
r = uv_tcp_open((uv_tcp_t *)&sandbox->client_libuv_stream, sandbox->client_socket_descriptor);
assert(r == 0);
#else
/* Freshly allocated sandbox going runnable for first time, so register client socket with epoll */
struct epoll_event accept_evt;
accept_evt.data.ptr = (void *)sandbox;
@ -263,7 +229,6 @@ sandbox_open_http(struct sandbox *sandbox)
int rc = epoll_ctl(worker_thread_epoll_file_descriptor, EPOLL_CTL_ADD, sandbox->client_socket_descriptor,
&accept_evt);
if (unlikely(rc < 0)) panic_err();
#endif
}
/**
@ -357,24 +322,7 @@ err:
assert(sandbox->state == SANDBOX_RUNNING);
/* Send a 400 error back to the client */
rc = 0;
int sent = 0;
int to_send = strlen(HTTP_RESPONSE_400_BAD_REQUEST);
while (sent < to_send) {
rc = write(sandbox->client_socket_descriptor, &HTTP_RESPONSE_400_BAD_REQUEST[sent], to_send - sent);
if (rc < 0) {
if (errno == EAGAIN) {
debuglog("Unexpectedly blocking on write of 4XX error");
worker_thread_block_current_sandbox();
continue;
}
debuglog("Failed to send 400: %s", strerror(errno));
break;
}
sent += rc;
}
client_socket_send(sandbox->client_socket_descriptor, 400);
#ifdef LOG_TOTAL_REQS_RESPS
if (rc >= 0) {
@ -384,7 +332,7 @@ err:
#endif
software_interrupt_disable();
sandbox_close_http(sandbox);
client_socket_close(sandbox->client_socket_descriptor);
sandbox_set_as_error(sandbox, SANDBOX_RUNNING);
goto done;
}
@ -876,7 +824,6 @@ sandbox_set_as_complete(struct sandbox *sandbox, sandbox_state_t last_state)
/*
* TODO: Enhance to include "spinning" or better "local|global scheduling latency" as well.
* Given the async I/O model of libuv, it is ambiguous how to model "spinning"
*/
perf_window_add(&sandbox->module->perf_window, sandbox->running_duration);
@ -946,9 +893,9 @@ err_stack_allocation_failed:
sandbox->state = SANDBOX_SET_AS_INITIALIZED;
sandbox->last_state_change_timestamp = now;
ps_list_init_d(sandbox);
sandbox_set_as_error(sandbox, SANDBOX_SET_AS_INITIALIZED);
err_memory_allocation_failed:
err:
sandbox_set_as_error(sandbox, SANDBOX_SET_AS_INITIALIZED);
perror(error_message);
sandbox = NULL;
goto done;

@ -82,9 +82,9 @@ sigalrm_handler(siginfo_t *signal_info, ucontext_t *user_context, struct sandbox
if (!software_interrupt_is_enabled()) return;
/*
* if a SIGALRM fires while the worker thread is between sandboxes, executing libuv, completion queue
* cleanup, etc. current_sandbox might be NULL. In this case, we should just allow return to allow the
* worker thread to run the main loop until it loads a new sandbox.
* if a SIGALRM fires while the worker thread is between sandboxes doing runtime tasks such as processing
* the epoll loop, performing completion queue cleanup, etc. current_sandbox might be NULL. In this case,
* we should just allow return to allow the worker thread to run the main loop until it loads a new sandbox.
*
* TODO: Consider if this should be an invarient and the worker thread should disable software
* interrupts when doing this work. Issue #95

@ -4,8 +4,8 @@
#include <sched.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <uv.h>
#include "client_socket.h"
#include "current_sandbox.h"
#include "debuglog.h"
#include "global_request_scheduler.h"
@ -24,15 +24,7 @@
/* context of the runtime thread before running sandboxes or to resume its "main". */
__thread struct arch_context worker_thread_base_context;
#ifdef USE_HTTP_UVIO
/* libuv i/o loop handle per sandboxing thread! */
__thread uv_loop_t worker_thread_uvio_handle;
/* Flag to signify if the thread is currently running callbacks in the libuv event loop */
static __thread bool worker_thread_is_in_libuv_event_loop = false;
#else
__thread int worker_thread_epoll_file_descriptor;
#endif /* USE_HTTP_UVIO */
/* Total Lock Contention in Cycles */
__thread uint64_t worker_thread_lock_duration;
@ -189,9 +181,6 @@ worker_thread_wakeup_sandbox(struct sandbox *sandbox)
void
worker_thread_block_current_sandbox(void)
{
#ifdef USE_HTTP_UVIO
assert(worker_thread_is_in_libuv_event_loop == false);
#endif
software_interrupt_disable();
/* Remove the sandbox we were just executing from the runqueue and mark as blocked */
@ -200,53 +189,19 @@ worker_thread_block_current_sandbox(void)
assert(current_sandbox->state == SANDBOX_RUNNING);
sandbox_set_as_blocked(current_sandbox, SANDBOX_RUNNING);
worker_thread_switch_to_base_context();
}
/**
* Execute I/O
*/
void
worker_thread_process_io(void)
{
#ifdef USE_HTTP_UVIO
#ifdef USE_HTTP_SYNC
/*
* TODO: realistically, we're processing all async I/O on this core when a sandbox blocks on http processing,
* not great! if there is a way, perhaps RUN_ONCE and check if your I/O is processed, if yes,
* return else do async block! Issue #98
*/
uv_run(worker_thread_get_libuv_handle(), UV_RUN_DEFAULT);
#else /* USE_HTTP_SYNC */
worker_thread_block_current_sandbox();
#endif /* USE_HTTP_UVIO */
#else
assert(false);
/* it should not be called if not using uvio for http */
#endif
/* The worker thread seems to "spin" on a blocked sandbox, so try to execute another sandbox for one quantum
* after blocking to give time for the action to resolve */
struct sandbox *next_sandbox = local_runqueue_get_next();
if (next_sandbox != NULL) {
worker_thread_switch_to_sandbox(next_sandbox);
} else {
worker_thread_switch_to_base_context();
};
}
/**
* Run all outstanding events in the local thread's libuv event loop
*/
void
worker_thread_execute_libuv_event_loop(void)
{
#ifdef USE_HTTP_UVIO
worker_thread_is_in_libuv_event_loop = true;
int n = uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT), i = 0;
while (n > 0) {
n--;
uv_run(worker_thread_get_libuv_handle(), UV_RUN_NOWAIT);
}
worker_thread_is_in_libuv_event_loop = false;
#endif
return;
}
/**
* Run all outstanding events in the local thread's libuv event loop
* Run all outstanding events in the local thread's epoll loop
*/
static inline void
worker_thread_execute_epoll_loop(void)
@ -295,7 +250,8 @@ worker_thread_execute_epoll_loop(void)
case SANDBOX_ERROR:
panic("Expected to have closed socket");
default:
sandbox_close_http(sandbox);
client_socket_send(sandbox->client_socket_descriptor, 503);
client_socket_close(sandbox->client_socket_descriptor);
sandbox_set_as_error(sandbox, sandbox->state);
}
};
@ -303,34 +259,9 @@ worker_thread_execute_epoll_loop(void)
}
}
static inline void
worker_thread_initialize_async_io()
{
#ifdef USE_HTTP_UVIO
worker_thread_is_in_libuv_event_loop = false;
/* Initialize libuv event loop handle */
uv_loop_init(&worker_thread_uvio_handle);
#else
/* Initialize epoll */
worker_thread_epoll_file_descriptor = epoll_create1(0);
if (unlikely(worker_thread_epoll_file_descriptor < 0)) panic_err();
#endif
}
static inline void
worker_thread_process_async_io()
{
#ifdef USE_HTTP_UVIO
/* Execute libuv event loop */
if (!worker_thread_is_in_libuv_event_loop) worker_thread_execute_libuv_event_loop();
#else
/* Execute non-blocking epoll_wait to add sandboxes back on the runqueue */
worker_thread_execute_epoll_loop();
#endif
}
/**
* The entry function for sandbox worker threads
* Initializes thread-local state, unmasks signals, sets up libuv loop and
* Initializes thread-local state, unmasks signals, sets up epoll loop and
* @param return_code - argument provided by pthread API. We set to -1 on error
*/
void *
@ -369,7 +300,9 @@ worker_thread_main(void *return_code)
#endif
signal(SIGPIPE, SIG_IGN);
worker_thread_initialize_async_io();
/* Initialize epoll */
worker_thread_epoll_file_descriptor = epoll_create1(0);
if (unlikely(worker_thread_epoll_file_descriptor < 0)) panic_err();
/* Begin Worker Execution Loop */
struct sandbox *next_sandbox;
@ -377,7 +310,7 @@ worker_thread_main(void *return_code)
/* Assumption: current_sandbox should be unset at start of loop */
assert(current_sandbox_get() == NULL);
worker_thread_process_async_io();
worker_thread_execute_epoll_loop();
/* Switch to a sandbox if one is ready to run */
software_interrupt_disable();

Loading…
Cancel
Save