Skip to content

Commit

Permalink
fix completion callback thread tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Sep 24, 2024
1 parent bd6e603 commit b373936
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 21 deletions.
2 changes: 2 additions & 0 deletions include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ struct dispatch_loop {
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;
aws_thread_id_t m_current_thread_id;
bool processing;

struct {
struct dispatch_scheduling_state scheduling_state;
Expand Down
18 changes: 15 additions & 3 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop);

struct dispatch_loop *dispatch_loop = event_loop->impl_data;
dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = true;

/* make sure the loop is running so we can schedule a last task. */
s_run(event_loop);
Expand Down Expand Up @@ -249,6 +251,9 @@ static void s_destroy(struct aws_event_loop *event_loop) {

dispatch_loop->synced_data.suspended = true;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = false;
});

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Releasing Dispatch Queue.", (void *)event_loop);
Expand Down Expand Up @@ -386,14 +391,20 @@ void run_iteration(void *context) {
aws_task_scheduler_schedule_future(&dispatch_loop->scheduler, task, task->timestamp);
}
}


dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = true;

// run all scheduled tasks
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(event_loop, &now_ns);
aws_task_scheduler_run_all(&dispatch_loop->scheduler, now_ns);
aws_event_loop_register_tick_end(event_loop);

end_iteration(entry);

dispatch_loop->m_current_thread_id = aws_thread_current_thread_id();
dispatch_loop->processing = false;
}

// Checks if a new iteration task needs to be scheduled, given a target timestamp
Expand Down Expand Up @@ -481,6 +492,7 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
// tasks as cross thread tasks. Ignore the caller thread verification for apple
// dispatch queue.
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
(void)event_loop;
return true;
struct dispatch_loop* dispatch_queue = event_loop->impl_data;
bool result = dispatch_queue->processing && aws_thread_thread_id_equal(dispatch_queue->m_current_thread_id, aws_thread_current_thread_id());
return result;
}
204 changes: 186 additions & 18 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,29 @@ struct nw_socket_connect_args {
struct aws_socket *socket;
};


struct nw_socket_readable_args {
int error_code;
struct aws_allocator *allocator;
struct aws_socket *socket;
};

struct nw_socket_written_args {
int error_code;
struct aws_allocator *allocator;
struct aws_socket *socket;
aws_socket_on_write_completed_fn *written_fn;
void *user_data;
size_t bytes_written;
};

struct nw_socket_cancel_task_args {
struct aws_allocator *allocator;
struct aws_socket *socket;
struct aws_task* task_to_cancel;
};


struct nw_socket {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
Expand Down Expand Up @@ -347,6 +370,151 @@ static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_
}
}


static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_task_status status)
{
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_readable_args * args = arg;

struct nw_socket* nw_socket = args->socket->impl;
nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}


static void s_schedule_on_readable(struct aws_socket* socket, int error_code){
struct aws_task* task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));;
struct nw_socket_readable_args* args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args));


args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;

aws_task_init(
task,
s_process_readable_task,
args,
"readableTask");


aws_event_loop_schedule_task_now(socket->event_loop, task);

}



static void s_process_connection_success_task(struct aws_task *task, void *arg, enum aws_task_status status)
{
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_readable_args * args = arg;

struct nw_socket* nw_socket = args->socket->impl;
nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}


static void s_schedule_on_connection_success(struct aws_socket* socket, int error_code){

struct aws_task* task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));;
struct nw_socket_readable_args* args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args));


args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;

aws_task_init(
task,
s_process_connection_success_task,
args,
"connectionSuccessTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}


static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status)
{
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_cancel_task_args * args = arg;

struct nw_socket* nw_socket = args->socket->impl;
if(status == AWS_TASK_STATUS_RUN_READY)
aws_event_loop_cancel_task(args->socket->event_loop, args->task_to_cancel);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}

// As cancel task has to run on the same thread & we dont have control on dispatch queue thread,
// we always schedule the cancel task on event loop
static void s_schedule_cancel_task(struct aws_socket* socket, struct aws_task* task_to_cancel){

struct aws_task* task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));;
struct nw_socket_cancel_task_args* args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_cancel_task_args));


args->socket = socket;
args->allocator = socket->allocator;
args->task_to_cancel = task_to_cancel;

aws_task_init(
task,
s_process_cancel_task,
args,
"cancelTaskTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}



static void s_process_write_task(struct aws_task *task, void *arg, enum aws_task_status status)
{
// TODO: WAHT IF THE TASK IS CANCELED???

(void)status;
struct nw_socket_written_args * args = arg;

struct nw_socket* nw_socket = args->socket->impl;
args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data);

aws_mem_release(args->allocator, task);
aws_mem_release(args->allocator, args);
}


static void s_schedule_write_fn(struct aws_socket* socket, int error_code, size_t bytes_written,
void *user_data, aws_socket_on_write_completed_fn *written_fn){

struct aws_task* task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task));;
struct nw_socket_written_args* args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_written_args));

args->socket = socket;
args->allocator = socket->allocator;
args->error_code = error_code;
args->written_fn = written_fn;
args->user_data = user_data;
args->bytes_written = bytes_written;

aws_task_init(
task,
s_process_write_task,
args,
"writtenTask");
aws_event_loop_schedule_task_now(socket->event_loop, task);
}

static int s_socket_connect_fn(
struct aws_socket *socket,
const struct aws_socket_endpoint *remote_endpoint,
Expand Down Expand Up @@ -443,8 +611,9 @@ static int s_socket_connect_fn(
return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
}

socket->io_handle.data.handle = nw_connection_create(endpoint, nw_socket->socket_options_to_params);
nw_socket->nw_connection = socket->io_handle.data.handle;

nw_socket->nw_connection = nw_connection_create(endpoint, nw_socket->socket_options_to_params);
socket->io_handle.data.handle = nw_socket->nw_connection;
nw_release(endpoint);

if (!socket->io_handle.data.handle) {
Expand Down Expand Up @@ -520,10 +689,10 @@ static int s_socket_connect_fn(
socket->state = CONNECTED_WRITE | CONNECTED_READ;

// Cancel the connection timeout task
aws_event_loop_cancel_task(event_loop, &nw_socket->connect_args->task);
s_schedule_cancel_task(socket, &nw_socket->connect_args->task);

aws_ref_count_acquire(&nw_socket->ref_count);
on_connection_result(socket, AWS_OP_SUCCESS, user_data);
s_schedule_on_connection_success(socket, AWS_OP_SUCCESS);
aws_ref_count_release(&nw_socket->ref_count);
nw_socket->setup_run = true;
} else if (error) {
Expand All @@ -536,7 +705,7 @@ static int s_socket_connect_fn(
socket->io_handle.data.handle,
error_code);
// Cancel the connection timeout task
aws_event_loop_cancel_task(event_loop, &nw_socket->connect_args->task);
s_schedule_cancel_task(socket, &nw_socket->connect_args->task);
/* we don't let this thing do DNS or TLS. Everything had better be a posix error. */
AWS_ASSERT(nw_error_get_error_domain(error) == nw_error_domain_posix);
error_code = s_determine_socket_error(error_code);
Expand All @@ -545,10 +714,10 @@ static int s_socket_connect_fn(
socket->state = ERROR;
aws_ref_count_acquire(&nw_socket->ref_count);
if (!nw_socket->setup_run) {
on_connection_result(socket, error_code, user_data);
s_schedule_on_connection_success(socket, error_code);
nw_socket->setup_run = true;
} else if (socket->readable_fn) {
socket->readable_fn(socket, nw_socket->last_error, socket->readable_user_data);
s_schedule_on_readable(socket, nw_socket->last_error);
}

aws_ref_count_release(&nw_socket->ref_count);
Expand All @@ -557,17 +726,18 @@ static int s_socket_connect_fn(
* we uninstall this handler right before calling close on the socket so this shouldn't
* get hit unless it was triggered remotely */
// Cancel the connection timeout task
aws_event_loop_cancel_task(event_loop, &nw_socket->connect_args->task);
s_schedule_cancel_task(socket, &nw_socket->connect_args->task);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle);
socket->state = CLOSED;
aws_ref_count_acquire(&nw_socket->ref_count);
aws_raise_error(AWS_IO_SOCKET_CLOSED);
if (!nw_socket->setup_run) {
on_connection_result(socket, AWS_IO_SOCKET_CLOSED, user_data);

s_schedule_on_connection_success(socket, AWS_IO_SOCKET_CLOSED);
nw_socket->setup_run = true;
} else if (socket->readable_fn) {
socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data);
s_schedule_on_readable(socket, AWS_IO_SOCKET_CLOSED);
}
aws_ref_count_release(&nw_socket->ref_count);
}
Expand Down Expand Up @@ -931,7 +1101,8 @@ static void s_schedule_next_read(struct aws_socket *socket) {
(void *)socket,
socket->io_handle.data.handle,
(int)dispatch_data_get_size(data));
nw_socket->on_readable(socket, AWS_ERROR_SUCCESS, nw_socket->on_readable_user_data);

s_schedule_on_readable(socket, AWS_ERROR_SUCCESS);
}
if (!is_complete) {
s_schedule_next_read(socket);
Expand All @@ -947,7 +1118,8 @@ static void s_schedule_next_read(struct aws_socket *socket) {
socket->io_handle.data.handle,
error_code);

nw_socket->on_readable(socket, error_code, nw_socket->on_readable_user_data);
s_schedule_on_readable(socket, error_code);

}
aws_ref_count_release(&nw_socket->ref_count);
});
Expand Down Expand Up @@ -1077,9 +1249,7 @@ static int s_socket_write_fn(
nw_connection_t handle = nw_socket->nw_connection;
struct dispatch_loop *dispath_event_loop = socket->event_loop->impl_data;

dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, dispath_event_loop->dispatch_queue, ^{
AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id= %p: dispatch data destructor ", (void *)cursor);
});
dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, dispath_event_loop->dispatch_queue,DISPATCH_DATA_DESTRUCTOR_DEFAULT);

AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
Expand All @@ -1097,7 +1267,6 @@ static int s_socket_write_fn(
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle);
written_fn(socket, 0, 0, user_data);
dispatch_release(data);
aws_ref_count_release(&nw_socket->ref_count);
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
Expand Down Expand Up @@ -1136,14 +1305,13 @@ static int s_socket_write_fn(
(void *)socket,
handle,
(int)written_size);
written_fn(socket, error_code, !error_code ? written_size : 0, user_data);
s_schedule_write_fn(socket, error_code, !error_code ? written_size : 0,user_data, written_fn);
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"[DEBUG]id=%p, handle=%p, [DEBUG] releasing data call, data=%p",
(void *)socket,
handle,
data);
dispatch_release(data);
aws_ref_count_release(&nw_socket->ref_count);
});

Expand Down

0 comments on commit b373936

Please sign in to comment.