Skip to content

Commit

Permalink
use testing_channel_drain_task_queue() (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
graebm committed May 2, 2019
1 parent 3e4eb7c commit 8c70506
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 34 deletions.
4 changes: 2 additions & 2 deletions codebuild/common-posix.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ if [ "$TRAVIS_OS_NAME" != "osx" ]; then
sudo apt-get install libssl-dev -y
install_library s2n 7c9069618e68214802ac7fbf45705d5f8b53135f
fi
install_library aws-c-common v0.3.7
install_library aws-c-io v0.3.5
install_library aws-c-common
install_library aws-c-io

mkdir -p build
pushd build
Expand Down
5 changes: 5 additions & 0 deletions source/connection_h1.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@ static void s_update_window_task(struct aws_channel_task *channel_task, void *ar
static void s_stream_update_window(struct aws_http_stream *stream, size_t increment_size) {
struct h1_connection *connection = AWS_CONTAINER_OF(stream->owning_connection, struct h1_connection, base);

if (increment_size == 0) {
AWS_LOGF_TRACE(AWS_LS_HTTP_CONNECTION, "id=%p: Ignoring window update of size 0.", (void *)&connection->base);
return;
}

/* If we're on the thread, just do it. */
if (aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)) {
AWS_LOGF_TRACE(
Expand Down
83 changes: 51 additions & 32 deletions tests/test_h1_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ static int s_tester_init(struct tester *tester, struct aws_allocator *alloc) {

aws_channel_acquire_hold(tester->testing_channel.channel);

testing_channel_drain_queued_tasks(&tester->testing_channel);

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -150,7 +152,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_1liner) {
struct aws_http_stream *stream = aws_http_stream_new_client_request(&opt);
ASSERT_NOT_NULL(stream);

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
const char *expected = "GET / HTTP/1.1\r\n"
Expand Down Expand Up @@ -190,7 +192,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_headers) {
struct aws_http_stream *stream = aws_http_stream_new_client_request(&opt);
ASSERT_NOT_NULL(stream);

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
const char *expected = "GET / HTTP/1.1\r\n"
Expand Down Expand Up @@ -255,7 +257,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_body) {
struct aws_http_stream *stream = aws_http_stream_new_client_request(&opt);
ASSERT_NOT_NULL(stream);

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
const char *expected = "PUT /plan.txt HTTP/1.1\r\n"
Expand Down Expand Up @@ -290,7 +292,7 @@ static int s_check_multiple_messages(struct tester *tester, struct aws_byte_curs
while (remaining > 0) {
/* Tick event loop if there are no messages already */
if (aws_linked_list_empty(msgs)) {
testing_channel_execute_queued_tasks(&tester->testing_channel);
testing_channel_run_currently_queued_tasks(&tester->testing_channel);
}

/* There should be EXACTLY 1 aws_io_message after ticking. */
Expand All @@ -317,7 +319,7 @@ static int s_check_multiple_messages(struct tester *tester, struct aws_byte_curs
}

/* Check that no more messages are produced unexpectedly */
testing_channel_execute_queued_tasks(&tester->testing_channel);
testing_channel_drain_queued_tasks(&tester->testing_channel);
ASSERT_TRUE(aws_linked_list_empty(msgs));

*out_num_messages = num_messages;
Expand Down Expand Up @@ -470,7 +472,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_multiple_in_1_io_message) {
ASSERT_NOT_NULL(streams[i]);
}

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
const char *expected = "GET / HTTP/1.1\r\n"
Expand Down Expand Up @@ -676,11 +678,13 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_1liner) {
struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
ASSERT_SUCCESS(s_send_response_str(&tester, "HTTP/1.1 204 No Content\r\n\r\n"));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
ASSERT_TRUE(response.on_complete_cb_count == 1);
ASSERT_TRUE(response.on_complete_error_code == AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -719,7 +723,7 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_headers) {
struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
ASSERT_SUCCESS(s_send_response_str(
Expand All @@ -729,6 +733,8 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_headers) {
"Location: /index.html\r\n"
"\r\n"));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
ASSERT_TRUE(response.on_complete_cb_count == 1);
ASSERT_TRUE(response.on_complete_error_code == AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -759,7 +765,7 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_body) {
struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
ASSERT_SUCCESS(s_send_response_str(
Expand All @@ -769,6 +775,8 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_body) {
"\r\n"
"Call Momo"));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
ASSERT_TRUE(response.on_complete_cb_count == 1);
ASSERT_TRUE(response.on_complete_error_code == AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -799,7 +807,7 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_1_from_multiple_io_messages) {
struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response with each byte in its own aws_io_message */
const char *response_str = "HTTP/1.1 200 OK\r\n"
Expand All @@ -811,6 +819,8 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_1_from_multiple_io_messages) {
s_send_response(&tester, aws_byte_cursor_from_array(response_str + i, 1));
}

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
ASSERT_TRUE(response.on_complete_cb_count == 1);
ASSERT_TRUE(response.on_complete_error_code == AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -841,9 +851,8 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_multiple_from_1_io_message) {
struct response_tester responses[3];
for (size_t i = 0; i < AWS_ARRAY_SIZE(responses); ++i) {
ASSERT_SUCCESS(s_response_tester_init(&responses[i], allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
}
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send all responses in a single aws_io_message */
ASSERT_SUCCESS(s_send_response_str(
Expand All @@ -852,6 +861,8 @@ H1_CLIENT_TEST_CASE(h1_client_response_get_multiple_from_1_io_message) {
"HTTP/1.1 204 No Content\r\n\r\n"
"HTTP/1.1 204 No Content\r\n\r\n"));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check results */
for (size_t i = 0; i < AWS_ARRAY_SIZE(responses); ++i) {
ASSERT_TRUE(responses[i].on_complete_cb_count == 1);
Expand Down Expand Up @@ -882,12 +893,12 @@ H1_CLIENT_TEST_CASE(h1_client_response_with_bad_data_shuts_down_connection) {
struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
ASSERT_SUCCESS(s_send_response_str_ignore_errors(&tester, "Mmmm garbage data\r\n\r\n"));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
ASSERT_TRUE(response.on_complete_cb_count == 1);
Expand All @@ -914,7 +925,7 @@ H1_CLIENT_TEST_CASE(h1_client_response_with_too_much_data_shuts_down_connection)

struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send 2 responses in a single aws_io_message. */
ASSERT_SUCCESS(s_send_response_ex(
Expand All @@ -923,6 +934,8 @@ H1_CLIENT_TEST_CASE(h1_client_response_with_too_much_data_shuts_down_connection)
"HTTP/1.1 204 No Content\r\n\r\n"),
true /* ignore send errors */));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* 1st response should have come across successfully */
ASSERT_TRUE(response.on_complete_cb_count == 1);
ASSERT_TRUE(response.on_complete_error_code == AWS_ERROR_SUCCESS);
Expand All @@ -933,7 +946,7 @@ H1_CLIENT_TEST_CASE(h1_client_response_with_too_much_data_shuts_down_connection)
ASSERT_SUCCESS(s_response_tester_clean_up(&response));

/* extra data should have caused channel shutdown */
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);
ASSERT_TRUE(tester.is_shut_down);
ASSERT_TRUE(tester.shutdown_error_code != AWS_ERROR_SUCCESS);

Expand Down Expand Up @@ -1009,14 +1022,17 @@ H1_CLIENT_TEST_CASE(h1_client_response_arrives_before_request_done_sending_is_ok
ASSERT_SUCCESS(s_response_tester_init_ex(&response, allocator, &opt, &body_sender));

/* send head of request */
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_run_currently_queued_tasks(&tester.testing_channel);

/* send response */
ASSERT_SUCCESS(s_send_response_str(&tester, "HTTP/1.1 200 OK\r\n\r\n"));

/* tick loop until body finishes sending.*/
while (body_sender.cursor.len > 0) {
testing_channel_execute_queued_tasks(&tester.testing_channel);
/* on_complete shouldn't fire until all outgoing data sent AND all incoming data received */
ASSERT_TRUE(response.on_complete_cb_count == 0);

testing_channel_run_currently_queued_tasks(&tester.testing_channel);
}

/* check result */
Expand Down Expand Up @@ -1046,7 +1062,7 @@ H1_CLIENT_TEST_CASE(h1_client_response_without_request_shuts_down_connection) {
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

ASSERT_SUCCESS(s_send_response_str_ignore_errors(&tester, "HTTP/1.1 200 OK\r\n\r\n"));
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

ASSERT_TRUE(tester.is_shut_down);
ASSERT_TRUE(tester.shutdown_error_code != AWS_ERROR_SUCCESS);
Expand All @@ -1071,7 +1087,7 @@ H1_CLIENT_TEST_CASE(h1_client_window_reopens_by_default) {
struct response_tester response;
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
const char *response_str = "HTTP/1.1 200 OK\r\n"
Expand All @@ -1080,6 +1096,8 @@ H1_CLIENT_TEST_CASE(h1_client_window_reopens_by_default) {
"Call Momo";
ASSERT_SUCCESS(s_send_response_str(&tester, response_str));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
size_t window_update = testing_channel_last_window_update(&tester.testing_channel);
ASSERT_TRUE(window_update == strlen(response_str));
Expand All @@ -1106,7 +1124,7 @@ H1_CLIENT_TEST_CASE(h1_client_window_shrinks_if_user_says_so) {
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));
response.stop_auto_window_update = true;

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
const char *response_str = "HTTP/1.1 200 OK\r\n"
Expand All @@ -1115,6 +1133,8 @@ H1_CLIENT_TEST_CASE(h1_client_window_shrinks_if_user_says_so) {
"Call Momo";
ASSERT_SUCCESS(s_send_response_str(&tester, response_str));

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
size_t window_update = testing_channel_last_window_update(&tester.testing_channel);
size_t message_sans_body = strlen(response_str) - 9;
Expand All @@ -1141,7 +1161,7 @@ static int s_window_update(struct aws_allocator *allocator, bool on_thread) {
ASSERT_SUCCESS(s_response_tester_init(&response, allocator, &opt));
response.stop_auto_window_update = true;

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
const char *response_str = "HTTP/1.1 200 OK\r\n"
Expand All @@ -1151,7 +1171,7 @@ static int s_window_update(struct aws_allocator *allocator, bool on_thread) {
ASSERT_SUCCESS(s_send_response_str(&tester, response_str));

/* drain the task queue, in case there's an update window task in there from the headers */
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
if (!on_thread) {
Expand All @@ -1162,9 +1182,9 @@ static int s_window_update(struct aws_allocator *allocator, bool on_thread) {

if (!on_thread) {
testing_channel_set_is_on_users_thread(&tester.testing_channel, true);
testing_channel_execute_queued_tasks(&tester.testing_channel);
}
testing_channel_execute_queued_tasks(&tester.testing_channel);

testing_channel_drain_queued_tasks(&tester.testing_channel);

size_t window_update = testing_channel_last_window_update(&tester.testing_channel);
ASSERT_INT_EQUALS(9, window_update);
Expand Down Expand Up @@ -1208,11 +1228,11 @@ H1_CLIENT_TEST_CASE(h1_client_request_cancelled_by_channel_shutdown) {
struct aws_http_stream *stream = aws_http_stream_new_client_request(&opt);
ASSERT_NOT_NULL(stream);

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* shutdown channel before request completes */
aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_SUCCESS);
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* even though the channel shut down with error_code 0,
* the stream should not get code 0 because it did not complete successfully */
Expand Down Expand Up @@ -1247,7 +1267,7 @@ H1_CLIENT_TEST_CASE(h1_client_multiple_requests_cancelled_by_channel_shutdown) {
}

/* 2 streams are now in-progress */
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* Make 1 more stream that's still locked away in the pending queue */
opt.user_data = &completion_error_codes[2];
Expand All @@ -1256,7 +1276,7 @@ H1_CLIENT_TEST_CASE(h1_client_multiple_requests_cancelled_by_channel_shutdown) {

/* shutdown channel */
aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_SUCCESS);
testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check results */
for (int i = 0; i < 3; ++i) {
Expand Down Expand Up @@ -1379,7 +1399,6 @@ static void s_close_from_stream_complete(struct aws_http_stream *stream, int err
static int s_test_close_from_callback(struct aws_allocator *allocator, enum request_callback close_at) {
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));
testing_channel_execute_queued_tasks(&tester.testing_channel);

struct close_from_callback_tester close_tester = {
.close_at = close_at,
Expand Down Expand Up @@ -1409,7 +1428,7 @@ static int s_test_close_from_callback(struct aws_allocator *allocator, enum requ
struct aws_http_stream *stream = aws_http_stream_new_client_request(&opt);
ASSERT_NOT_NULL(stream);

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* send response */
ASSERT_SUCCESS(s_send_response_str_ignore_errors(
Expand All @@ -1425,7 +1444,7 @@ static int s_test_close_from_callback(struct aws_allocator *allocator, enum requ
"0\r\n"
"\r\n"));

testing_channel_execute_queued_tasks(&tester.testing_channel);
testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check that callbacks were invoked before close_at, but not after */
for (int i = 0; i < REQUEST_CALLBACK_COMPLETE; ++i) {
Expand Down

0 comments on commit 8c70506

Please sign in to comment.