diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c index 5c78c95492b..93f71b745c6 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_windows.c @@ -82,15 +82,16 @@ static void create_sockets(SOCKET sv[2]) { sv[0] = svr_sock; } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( + const char *name, grpc_resource_quota *resource_quota, + size_t read_slice_size) { SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), - "endpoint:server"); + resource_quota, "endpoint:server"); p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), - "endpoint:client"); + resource_quota, "endpoint:client"); return p; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 598c0125899..4d1e809872a 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -43,6 +43,7 @@ #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -61,13 +62,16 @@ typedef struct { int refs; grpc_closure on_connect; grpc_endpoint **endpoint; + grpc_resource_quota *resource_quota; } async_connect; -static void async_connect_unlock_and_cleanup(async_connect *ac, +static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, + async_connect *ac, grpc_winsocket *socket) { int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { + grpc_resource_quota_internal_unref(exec_ctx, ac->resource_quota); gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_name); gpr_free(ac); @@ -83,7 +87,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (socket != NULL) { grpc_winsocket_shutdown(socket); } - async_connect_unlock_and_cleanup(ac, socket); + async_connect_unlock_and_cleanup(exec_ctx, ac, socket); } static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { @@ -113,12 +117,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (!wsa_success) { error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); } else { - *ep = grpc_tcp_create(socket, ac->addr_name); + *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); socket = NULL; } } - async_connect_unlock_and_cleanup(ac, socket); + async_connect_unlock_and_cleanup(exec_ctx, ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); @@ -129,6 +133,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const grpc_resolved_address *addr, gpr_timespec deadline) { SOCKET sock = INVALID_SOCKET; @@ -144,6 +149,17 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_winsocket_callback_info *info; grpc_error *error = GRPC_ERROR_NONE; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + *endpoint = NULL; /* Use dualstack sockets where available. */ @@ -177,8 +193,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_sockaddr_make_wildcard6(0, &local_address); - status = - bind(sock, (struct sockaddr *)&local_address.addr, local_address.len); + status = bind(sock, (struct sockaddr *)&local_address.addr, + (int)local_address.len); if (status != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "bind"); goto failure; @@ -206,6 +222,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, ac->refs = 2; ac->addr_name = grpc_sockaddr_to_uri(addr); ac->endpoint = endpoint; + ac->resource_quota = resource_quota; grpc_closure_init(&ac->on_connect, on_connect, ac); grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac, @@ -225,6 +242,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); } diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index ad6769a6ba8..ae54c70d2d0 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -100,14 +100,32 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; + + grpc_resource_quota *resource_quota; }; /* Public function. Allocates the proper data structures to hold a grpc_tcp_server. */ -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + s->resource_quota = grpc_resource_quota_create(NULL); + for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { + if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_POINTER) { + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + s->resource_quota = + grpc_resource_quota_internal_ref(args->args[i].value.pointer.p); + } else { + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA + " must be a pointer to a buffer pool"); + } + } + } gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; @@ -137,6 +155,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { grpc_winsocket_destroy(sp->socket); gpr_free(sp); } + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); } @@ -207,12 +226,13 @@ static grpc_error *prepare_socket(SOCKET sock, goto failure; } - sockname_temp.len = sizeof(struct sockaddr_storage); + int sockname_temp_len = sizeof(struct sockaddr_storage); if (getsockname(sock, (struct sockaddr *)sockname_temp.addr, - &sockname_temp.len) == SOCKET_ERROR) { + &sockname_temp_len) == SOCKET_ERROR) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"); goto failure; } + sockname_temp.len = sockname_temp_len; *port = grpc_sockaddr_get_port(&sockname_temp); return GRPC_ERROR_NONE; @@ -357,8 +377,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message); gpr_free(utf8_message); } + int peer_name_len = (int)peer_name.len; err = - getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name.len); + getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name_len); + peer_name.len = peer_name_len; if (!err) { peer_name_string = grpc_sockaddr_to_uri(&peer_name); } else { @@ -368,7 +390,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), - peer_name_string); + sp->server->resource_quota, peer_name_string); gpr_free(fd_name); gpr_free(peer_name_string); } else { @@ -466,10 +488,11 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { for (sp = s->head; sp; sp = sp->next) { - sockname_temp.len = sizeof(struct sockaddr_storage); + int sockname_temp_len = sizeof(struct sockaddr_storage); if (0 == getsockname(sp->socket->socket, (struct sockaddr *)sockname_temp.addr, - &sockname_temp.len)) { + &sockname_temp_len)) { + sockname_temp.len = sockname_temp_len; *port = grpc_sockaddr_get_port(&sockname_temp); if (*port > 0) { allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index bc1b91e109f..f825057c0e5 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -109,14 +109,29 @@ typedef struct grpc_tcp { grpc_slice_buffer *write_slices; grpc_slice_buffer *read_slices; + grpc_resource_user resource_user; + /* The IO Completion Port runs from another thread. We need some mechanism to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; + gpr_atm resource_user_shutdown_count; + char *peer_string; } grpc_tcp; +static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); + +static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, + grpc_tcp *tcp) { + if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) { + grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, + grpc_closure_create(win_unref_closure, tcp)); + } +} + static void tcp_free(grpc_tcp *tcp) { grpc_winsocket_destroy(tcp->socket); gpr_mu_destroy(&tcp->mu); @@ -155,6 +170,11 @@ static void tcp_unref(grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif +static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + TCP_UNREF(arg, "resource_user"); +} + /* Asynchronous callback from the IOCP, or the background thread. */ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; @@ -376,12 +396,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { callback. See the comments in on_read and on_write. */ tcp->shutting_down = 1; grpc_winsocket_shutdown(tcp->socket); + win_maybe_shutdown_resource_user(exec_ctx, tcp); gpr_mu_unlock(&tcp->mu); } static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; + win_maybe_shutdown_resource_user(exec_ctx, tcp); TCP_UNREF(tcp, "destroy"); } @@ -392,6 +414,11 @@ static char *win_get_peer(grpc_endpoint *ep) { static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } +static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return &tcp->resource_user; +} + static grpc_endpoint_vtable vtable = {win_read, win_write, win_get_workqueue, @@ -399,18 +426,22 @@ static grpc_endpoint_vtable vtable = {win_read, win_add_to_pollset_set, win_shutdown, win_destroy, + win_get_resource_user, win_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, + grpc_resource_quota *resource_quota, + char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->socket = socket; gpr_mu_init(&tcp->mu); - gpr_ref_init(&tcp->refcount, 1); + gpr_ref_init(&tcp->refcount, 2); grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); + grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 86d777235ef..4402de1c385 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -50,7 +50,9 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string); +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, + grpc_resource_quota *resource_quota, + char *peer_string); grpc_error *grpc_tcp_prepare_socket(SOCKET sock);