Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

socket related from network_framework_integration branch #662

Draft
wants to merge 39 commits into
base: grand_dispatch_queue
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
14a3386
socket related from network_framework_integration branch
sbSteveK Jul 29, 2024
9715a3e
Merge branch 'grand_dispatch_queue' into nw_socket
sbSteveK Jul 29, 2024
b830a86
missed s_socket_listen
sbSteveK Jul 29, 2024
cccbda2
move aws_socket_init_poll_based platform not supported function into …
sbSteveK Jul 30, 2024
caac9a5
small cleanups/comments
sbSteveK Jul 30, 2024
eb59ff1
Merge branch 'grand_dispatch_queue' into nw_socket
sbSteveK Jul 30, 2024
8a794de
nw_socket.c changes
sbSteveK Aug 8, 2024
553d45f
add nw_connection_t to nw_socket
sbSteveK Aug 9, 2024
cf610e9
read from socket works
sbSteveK Aug 12, 2024
07bac64
remove prints
sbSteveK Aug 13, 2024
dd1fbf2
trivial edits
sbSteveK Aug 13, 2024
2fd514c
check correct vtable func
sbSteveK Aug 13, 2024
2a0da42
clang format
sbSteveK Aug 13, 2024
9cc6620
socket_test add a manual way to set event_loop_style in options
sbSteveK Aug 13, 2024
e3281ee
event_loop add undefined event loop style and clang format
sbSteveK Aug 13, 2024
43fd436
clang format
sbSteveK Aug 13, 2024
1c1cd02
event_loop.c clang formatting and configurations
sbSteveK Aug 13, 2024
88f6de3
formatting
sbSteveK Aug 13, 2024
cf53cc6
format
sbSteveK Aug 13, 2024
a7ab224
macos errors
sbSteveK Aug 13, 2024
62fd06d
fix test
sbSteveK Aug 13, 2024
cbb8c42
formatting
sbSteveK Aug 13, 2024
4ce33ee
test fix
sbSteveK Aug 13, 2024
29ab896
prototype void
sbSteveK Aug 13, 2024
f9cd5d0
fix style func
sbSteveK Aug 13, 2024
2e55d2d
sprintf -> snprintf
sbSteveK Aug 13, 2024
6938bc3
manual default change for testing
sbSteveK Aug 21, 2024
4658492
Merge branch 'grand_dispatch_queue' into nw_socket
sbSteveK Sep 3, 2024
ef8d53f
Merge branch 'grand_dispatch_queue' of https://github.com/awslabs/aws…
xiazhvera Sep 11, 2024
731ba49
setup connection timeout
xiazhvera Sep 13, 2024
bac8b07
Merge branch 'grand_dispatch_queue' of github.com:awslabs/aws-c-io in…
xiazhvera Sep 17, 2024
429bf26
remove debug changes
xiazhvera Sep 17, 2024
2282f8f
Branched nw_socket work (#674)
sbSteveK Sep 24, 2024
8e7b351
veriify_peer spelling
sbSteveK Sep 24, 2024
54e58e9
remove redundant secure_transport_ctx
sbSteveK Sep 24, 2024
b01b510
channel_args creation simplification
sbSteveK Sep 24, 2024
5b5a953
empty line
sbSteveK Sep 24, 2024
b7f13df
clang formatting
sbSteveK Sep 24, 2024
b325809
more clang
sbSteveK Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ Simply call the `aws_client_bootstrap_new_socket_channel` `aws_server_bootstrap_
### Event Loop
Core to Async-IO is the event-loop. We provide an implementation for most platforms out of the box:

<!-- DEBUG WIP we need to insert Dispatch Queue here for iOS and specify kQueue is for macOS -->
Platform | Implementation
--- | ---
Linux | Edge-Triggered Epoll
Expand Down
52 changes: 52 additions & 0 deletions include/aws/io/private/dispatch_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H
#define AWS_IO_PRIVATE_DISPATCH_QUEUE_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <Security/Security.h>
#include <aws/io/tls_channel_handler.h>
#include <dispatch/dispatch.h>

struct secure_transport_ctx {
struct aws_tls_ctx ctx;
CFAllocatorRef wrapped_allocator;
CFArrayRef certs;
SecIdentityRef secitem_identity;
CFArrayRef ca_cert;
enum aws_tls_versions minimum_version;
struct aws_string *alpn_list;
bool verify_peer;
};

struct dispatch_scheduling_state {
// Let's us skip processing an iteration task if one is already in the middle
// of executing
bool is_executing_iteration;

// List<scheduled_service_entry> in sorted order by timestamp
//
// When we go to schedule a new iteration, we check here first to see
// if our scheduling attempt is redundant
struct aws_linked_list scheduled_services;
};

struct dispatch_loop {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;

struct {
struct dispatch_scheduling_state scheduling_state;
struct aws_linked_list cross_thread_tasks;
struct aws_mutex lock;
bool suspended;
} synced_data;

bool wakeup_schedule_needed;
};

#endif /* #ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H */
20 changes: 20 additions & 0 deletions include/aws/io/private/socket.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef AWS_IO_PRIVATE_SOCKET_H
#define AWS_IO_PRIVATE_SOCKET_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/io/socket.h>

int aws_socket_init_poll_based(
struct aws_socket *socket,
struct aws_allocator *alloc,
const struct aws_socket_options *options);

int aws_socket_init_completion_port_based(
struct aws_socket *socket,
struct aws_allocator *alloc,
const struct aws_socket_options *options);

#endif /* #ifndef AWS_IO_PRIVATE_SOCKET_H */
50 changes: 49 additions & 1 deletion include/aws/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

#include <aws/io/channel.h>
#include <aws/io/event_loop.h>
#include <aws/io/io.h>

AWS_PUSH_SANE_WARNING_LEVEL
Expand Down Expand Up @@ -44,6 +45,7 @@ struct aws_socket_options {
/* If set, sets the number of keep alive probes allowed to fail before the connection is considered
* lost. If zero OS defaults are used. On Windows, this option is meaningless until Windows 10 1703.*/
uint16_t keep_alive_max_failed_probes;
enum aws_event_loop_style event_loop_style;
bool keepalive;

/**
Expand Down Expand Up @@ -114,7 +116,44 @@ struct aws_socket_endpoint {
uint32_t port;
};

struct aws_socket;

struct aws_socket_vtable {
void (*socket_cleanup_fn)(struct aws_socket *socket);
int (*socket_connect_fn)(
struct aws_socket *socket,
const struct aws_socket_endpoint *remote_endpoint,
struct aws_event_loop *event_loop,
aws_socket_on_connection_result_fn *on_connection_result,
void *user_data);
int (*socket_bind_fn)(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint);
int (*socket_listen_fn)(struct aws_socket *socket, int backlog_size);
int (*socket_start_accept_fn)(
struct aws_socket *socket,
struct aws_event_loop *accept_loop,
aws_socket_on_accept_result_fn *on_accept_result,
void *user_data);
int (*socket_stop_accept_fn)(struct aws_socket *socket);
int (*socket_close_fn)(struct aws_socket *socket);
int (*socket_shutdown_dir_fn)(struct aws_socket *socket, enum aws_channel_direction dir);
int (*socket_set_options_fn)(struct aws_socket *socket, const struct aws_socket_options *options);
int (*socket_assign_to_event_loop_fn)(struct aws_socket *socket, struct aws_event_loop *event_loop);
int (*socket_subscribe_to_readable_events_fn)(
struct aws_socket *socket,
aws_socket_on_readable_fn *on_readable,
void *user_data);
int (*socket_read_fn)(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read);
int (*socket_write_fn)(
struct aws_socket *socket,
const struct aws_byte_cursor *cursor,
aws_socket_on_write_completed_fn *written_fn,
void *user_data);
int (*socket_get_error_fn)(struct aws_socket *socket);
bool (*socket_is_open_fn)(struct aws_socket *socket);
};

struct aws_socket {
struct aws_socket_vtable *vtable;
struct aws_allocator *allocator;
struct aws_socket_endpoint local_endpoint;
struct aws_socket_endpoint remote_endpoint;
Expand All @@ -123,6 +162,7 @@ struct aws_socket {
struct aws_event_loop *event_loop;
struct aws_channel_handler *handler;
int state;
enum aws_event_loop_style event_loop_style;
aws_socket_on_readable_fn *readable_fn;
void *readable_user_data;
aws_socket_on_connection_result_fn *connection_result_fn;
Expand All @@ -147,6 +187,14 @@ aws_ms_fn_ptr aws_winsock_get_acceptex_fn(void);

AWS_EXTERN_C_BEGIN

AWS_IO_API struct aws_socket_options aws_socket_options_default_tcp_ipv6(enum aws_event_loop_style el_style);
AWS_IO_API struct aws_socket_options aws_socket_options_default_tcp_ipv4(enum aws_event_loop_style el_style);

AWS_IO_API struct aws_socket_options aws_socket_options_default_udp_ipv6(enum aws_event_loop_style el_style);
AWS_IO_API struct aws_socket_options aws_socket_options_default_udp_ipv4(enum aws_event_loop_style el_style);

AWS_IO_API struct aws_socket_options aws_socket_options_default_local(enum aws_event_loop_style el_style);

/**
* Initializes a socket object with socket options. options will be copied.
*/
Expand Down Expand Up @@ -260,7 +308,7 @@ AWS_IO_API int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct
AWS_IO_API struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket);

/**
* Subscribes on_readable to notifications when the socket goes readable (edge-triggered). Errors will also be recieved
* Subscribes on_readable to notifications when the socket goes readable (edge-triggered). Errors will also be received
* in the callback.
*
* Note! This function is technically not thread safe, but we do not enforce which thread you call from.
Expand Down
23 changes: 11 additions & 12 deletions source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ static bool s_aws_socket_domain_uses_dns(enum aws_socket_domain domain) {
return domain == AWS_SOCKET_IPV4 || domain == AWS_SOCKET_IPV6;
}

/* Called when a socket connection attempt task completes. First socket to successfully open
* assigns itself to connection_args->channel_data.socket and flips connection_args->connection_chosen
* to true. Subsequent successful sockets will be released and cleaned up
*/
static void s_on_client_connection_established(struct aws_socket *socket, int error_code, void *user_data) {
struct client_connection_args *connection_args = user_data;

Expand Down Expand Up @@ -565,7 +569,6 @@ static void s_on_client_connection_established(struct aws_socket *socket, int er
(void *)connection_args->bootstrap,
(void *)socket);
aws_socket_close(socket);

aws_socket_clean_up(socket);
aws_mem_release(connection_args->bootstrap->allocator, socket);

Expand Down Expand Up @@ -813,8 +816,9 @@ int aws_client_bootstrap_new_socket_channel(struct aws_socket_channel_bootstrap_
AWS_FATAL_ASSERT(options->shutdown_callback);
AWS_FATAL_ASSERT(bootstrap);

const struct aws_socket_options *socket_options = options->socket_options;
struct aws_socket_options *socket_options = (struct aws_socket_options *)options->socket_options;
AWS_FATAL_ASSERT(socket_options != NULL);
socket_options->event_loop_style = aws_event_loop_group_get_style(bootstrap->event_loop_group);

const struct aws_tls_connection_options *tls_options = options->tls_options;

Expand All @@ -832,10 +836,6 @@ int aws_client_bootstrap_new_socket_channel(struct aws_socket_channel_bootstrap_
struct client_connection_args *client_connection_args =
aws_mem_calloc(bootstrap->allocator, 1, sizeof(struct client_connection_args));

if (!client_connection_args) {
return AWS_OP_ERR;
}

const char *host_name = options->host_name;
uint32_t port = options->port;

Expand Down Expand Up @@ -1361,9 +1361,7 @@ void s_on_server_connection_result(
(void *)socket);
struct server_channel_data *channel_data =
aws_mem_calloc(connection_args->bootstrap->allocator, 1, sizeof(struct server_channel_data));
if (!channel_data) {
goto error_cleanup;
}

channel_data->incoming_called = false;
channel_data->socket = new_socket;
channel_data->server_connection_args = connection_args;
Expand All @@ -1376,11 +1374,10 @@ void s_on_server_connection_result(
.setup_user_data = channel_data,
.shutdown_user_data = channel_data,
.on_shutdown_completed = s_on_server_channel_on_shutdown,
.event_loop = event_loop,
.enable_read_back_pressure = channel_data->server_connection_args->enable_read_back_pressure,
};

channel_args.event_loop = event_loop;
channel_args.enable_read_back_pressure = channel_data->server_connection_args->enable_read_back_pressure;

if (aws_socket_assign_to_event_loop(new_socket, event_loop)) {
aws_mem_release(connection_args->bootstrap->allocator, (void *)channel_data);
goto error_cleanup;
Expand Down Expand Up @@ -1497,6 +1494,8 @@ struct aws_socket *aws_server_bootstrap_new_socket_listener(
struct aws_event_loop *connection_loop =
aws_event_loop_group_get_next_loop(bootstrap_options->bootstrap->event_loop_group);

((struct aws_socket_options *)bootstrap_options->socket_options)->event_loop_style =
aws_event_loop_group_get_style(bootstrap_options->bootstrap->event_loop_group);
if (aws_socket_init(
&server_connection_args->listener,
bootstrap_options->bootstrap->allocator,
Expand Down
30 changes: 1 addition & 29 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <unistd.h>

#include <Block.h>
#include <aws/io/private/dispatch_queue.h>
#include <dispatch/dispatch.h>
#include <dispatch/queue.h>

Expand Down Expand Up @@ -46,42 +47,13 @@ static struct aws_event_loop_vtable s_vtable = {
.is_on_callers_thread = s_is_on_callers_thread,
};

struct dispatch_scheduling_state {
// Let's us skip processing an iteration task if one is already in the middle
// of executing
bool is_executing_iteration;

// List<scheduled_service_entry> in sorted order by timestamp
//
// When we go to schedule a new iteration, we check here first to see
// if our scheduling attempt is redundant
struct aws_linked_list scheduled_services;
};

struct scheduled_service_entry {
struct aws_allocator *allocator;
uint64_t timestamp;
struct aws_linked_list_node node;
struct aws_event_loop *loop; // might eventually need to be ref-counted for cleanup?
};

struct dispatch_loop {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_linked_list local_cross_thread_tasks;

struct {
struct dispatch_scheduling_state scheduling_state;
struct aws_linked_list cross_thread_tasks;
struct aws_mutex lock;
bool suspended;
} synced_data;

bool wakeup_schedule_needed;
};

struct scheduled_service_entry *scheduled_service_entry_new(struct aws_event_loop *loop, uint64_t timestamp) {
struct scheduled_service_entry *entry = aws_mem_calloc(loop->alloc, 1, sizeof(struct scheduled_service_entry));

Expand Down
Loading
Loading