Compare commits

...

1 Commits

Author SHA1 Message Date
Sean McBride a6649365a5 refactor: WIP replace vec_u8 with memstream
3 years ago

@ -15,6 +15,7 @@
#include "http_parser.h"
#include "http_parser_settings.h"
#include "tenant.h"
#include "unistd.h"
#include "vec.h"
#include "http_session_perf_log.h"
@ -46,7 +47,9 @@ struct http_session {
int socket;
struct http_parser http_parser;
struct http_request http_request;
struct vec_u8 request_buffer;
FILE *request_handle;
char *request_buffer;
size_t request_size;
char response_header[HTTP_SESSION_RESPONSE_HEADER_CAPACITY];
size_t response_header_length;
size_t response_header_written;
@ -95,8 +98,8 @@ http_session_init(struct http_session *session, int socket_descriptor, const str
http_session_parser_init(session);
int rc = vec_u8_init(&session->request_buffer, HTTP_SESSION_DEFAULT_REQUEST_RESPONSE_SIZE);
if (rc < 0) return -1;
session->request_handle = open_memstream(&session->request_buffer, &session->request_size);
if (session->request_handle == NULL) return -1;
/* Defer allocating response until we've matched a route */
session->response_buffer.buffer = NULL;
@ -114,7 +117,13 @@ http_session_init_response_buffer(struct http_session *session, size_t capacity)
int rc = vec_u8_init(&session->response_buffer, capacity);
if (rc < 0) {
vec_u8_deinit(&session->request_buffer);
fclose(session->request_handle);
session->request_handle = NULL;
if (session->request_buffer != NULL) {
free(session->request_buffer);
session->request_buffer = NULL;
}
session->request_size = 0;
return -1;
}
@ -149,7 +158,14 @@ http_session_deinit(struct http_session *session)
{
assert(session);
vec_u8_deinit(&session->request_buffer);
fclose(session->request_handle);
session->request_handle = NULL;
if (session->request_buffer != NULL) {
free(session->request_buffer);
session->request_buffer = NULL;
}
session->request_size = 0;
vec_u8_deinit(&session->response_buffer);
}
@ -253,45 +269,6 @@ http_session_send_response_body(struct http_session *session, void_star_cb on_ea
return 0;
}
static inline bool
http_session_request_buffer_is_full(struct http_session *session)
{
return session->request_buffer.length == session->request_buffer.capacity;
}
static inline int
http_session_request_buffer_grow(struct http_session *session)
{
/* We have not yet fully parsed the header, so we don't know content-length, so just grow
* (double) the buffer */
uint8_t *old_buffer = session->request_buffer.buffer;
if (vec_u8_grow(&session->request_buffer) != 0) {
debuglog("Failed to grow request buffer\n");
return -1;
}
/* buffer moved, so invalidate to reparse */
if (old_buffer != session->request_buffer.buffer) { http_session_parser_init(session); }
return 0;
}
static inline int
http_session_request_buffer_resize(struct http_session *session, int required_size)
{
uint8_t *old_buffer = session->request_buffer.buffer;
if (vec_u8_resize(&session->request_buffer, required_size) != 0) {
debuglog("Failed to resize request vector to %d bytes\n", required_size);
return -1;
}
/* buffer moved, so invalidate to reparse */
if (old_buffer != session->request_buffer.buffer) { http_session_parser_init(session); }
return 0;
}
typedef void (*http_session_cb)(struct http_session *);
static inline ssize_t
@ -359,8 +336,7 @@ static inline int
http_session_receive_request(struct http_session *session, void_star_cb on_eagain)
{
assert(session != NULL);
assert(session->request_buffer.capacity > 0);
assert(session->request_buffer.length <= session->request_buffer.capacity);
assert(session->request_handle != NULL);
assert(session->state == HTTP_SESSION_INITIALIZED || session->state == HTTP_SESSION_RECEIVE_REQUEST_BLOCKED);
session->state = HTTP_SESSION_RECEIVING_REQUEST;
@ -368,25 +344,7 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai
int rc = 0;
while (!session->http_request.message_end) {
/* If we know the header size and content-length, resize exactly. Otherwise double */
if (session->http_request.header_end && session->http_request.body) {
int header_size = (uint8_t *)session->http_request.body - session->request_buffer.buffer;
int required_size = header_size + session->http_request.body_length;
if (required_size > session->request_buffer.capacity) {
rc = http_session_request_buffer_resize(session, required_size);
if (rc != 0) goto err_nobufs;
}
} else if (http_session_request_buffer_is_full(session)) {
rc = http_session_request_buffer_grow(session);
if (rc != 0) goto err_nobufs;
}
ssize_t bytes_received =
tcp_session_recv(session->socket,
(char *)&session->request_buffer.buffer[session->request_buffer.length],
session->request_buffer.capacity - session->request_buffer.length, on_eagain,
session);
ssize_t bytes_received = tcp_session_recv(session->socket, session->request_handle, on_eagain, session);
if (unlikely(bytes_received == -EAGAIN))
goto err_eagain;
else if (unlikely(bytes_received < 0))
@ -396,9 +354,6 @@ http_session_receive_request(struct http_session *session, void_star_cb on_eagai
goto err;
assert(bytes_received > 0);
assert(session->request_buffer.length < session->request_buffer.capacity);
session->request_buffer.length += bytes_received;
ssize_t bytes_parsed = http_session_parse(session, bytes_received);
if (bytes_parsed == -1) goto err;

@ -59,7 +59,7 @@ tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_
}
/**
* Writes buffer to the client socket
* Reads client socket into memstream
* @param client_socket - the client
* @param buffer - buffer to reach the socket into
* @param buffer_len - buffer to reach the socket into
@ -67,20 +67,31 @@ tcp_session_send(int client_socket, const char *buffer, size_t buffer_len, void_
* @returns nwritten on success, -errno on error, -eagain on block
*/
static inline ssize_t
tcp_session_recv(int client_socket, char *buffer, size_t buffer_len, void_star_cb on_eagain, void *dataptr)
tcp_session_recv(int client_socket, FILE *handle, void_star_cb on_eagain, void *dataptr)
{
assert(buffer != NULL);
assert(buffer_len > 0);
assert(handle != NULL);
ssize_t received = read(client_socket, buffer, buffer_len);
if (received < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (on_eagain != NULL) on_eagain(dataptr);
return -EAGAIN;
} else {
return -errno;
char buf[BUFSIZ];
ssize_t received = 0;
do {
received = read(client_socket, buf, BUFSIZ);
if (received < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (on_eagain != NULL) on_eagain(dataptr);
return -EAGAIN;
} else {
return -errno;
}
}
}
/* TODO: Validate memory stream error handling */
size_t written = fwrite(buf, 1, received, handle);
assert(written == received);
} while (received != 0);
int rc = fflush(handle);
if (rc < 0) { return -errno; };
return received;
}

Loading…
Cancel
Save