Merge pull request #8553 from ctiller/buffer_pools_for_realsies

Fix windows
reviewable/pr8532/r1^2
Craig Tiller 8 years ago committed by GitHub
commit 1e1711845f
  1. 9
      src/core/lib/iomgr/endpoint_pair_windows.c
  2. 30
      src/core/lib/iomgr/tcp_client_windows.c
  3. 37
      src/core/lib/iomgr/tcp_server_windows.c
  4. 35
      src/core/lib/iomgr/tcp_windows.c
  5. 4
      src/core/lib/iomgr/tcp_windows.h

@ -82,15 +82,16 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock;
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
size_t read_slice_size) {
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
"endpoint:server");
resource_quota, "endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
"endpoint:client");
resource_quota, "endpoint:client");
return p;
}

@ -43,6 +43,7 @@
#include <grpc/support/slice_buffer.h>
#include <grpc/support/useful.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -61,13 +62,16 @@ typedef struct {
int refs;
grpc_closure on_connect;
grpc_endpoint **endpoint;
grpc_resource_quota *resource_quota;
} async_connect;
static void async_connect_unlock_and_cleanup(async_connect *ac,
static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
async_connect *ac,
grpc_winsocket *socket) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
grpc_resource_quota_internal_unref(exec_ctx, ac->resource_quota);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@ -83,7 +87,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (socket != NULL) {
grpc_winsocket_shutdown(socket);
}
async_connect_unlock_and_cleanup(ac, socket);
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
@ -113,12 +117,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (!wsa_success) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else {
*ep = grpc_tcp_create(socket, ac->addr_name);
*ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
socket = NULL;
}
}
async_connect_unlock_and_cleanup(ac, socket);
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
@ -129,6 +133,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) {
SOCKET sock = INVALID_SOCKET;
@ -144,6 +149,17 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_winsocket_callback_info *info;
grpc_error *error = GRPC_ERROR_NONE;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);
}
}
}
*endpoint = NULL;
/* Use dualstack sockets where available. */
@ -177,8 +193,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_sockaddr_make_wildcard6(0, &local_address);
status =
bind(sock, (struct sockaddr *)&local_address.addr, local_address.len);
status = bind(sock, (struct sockaddr *)&local_address.addr,
(int)local_address.len);
if (status != 0) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
@ -206,6 +222,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->resource_quota = resource_quota;
grpc_closure_init(&ac->on_connect, on_connect, ac);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
@ -225,6 +242,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
}

@ -100,14 +100,32 @@ struct grpc_tcp_server {
/* shutdown callback */
grpc_closure *shutdown_complete;
grpc_resource_quota *resource_quota;
};
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
}
}
}
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
@ -137,6 +155,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
}
@ -207,12 +226,13 @@ static grpc_error *prepare_socket(SOCKET sock,
goto failure;
}
sockname_temp.len = sizeof(struct sockaddr_storage);
int sockname_temp_len = sizeof(struct sockaddr_storage);
if (getsockname(sock, (struct sockaddr *)sockname_temp.addr,
&sockname_temp.len) == SOCKET_ERROR) {
&sockname_temp_len) == SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
goto failure;
}
sockname_temp.len = sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
return GRPC_ERROR_NONE;
@ -357,8 +377,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
gpr_free(utf8_message);
}
int peer_name_len = (int)peer_name.len;
err =
getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name.len);
getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name_len);
peer_name.len = peer_name_len;
if (!err) {
peer_name_string = grpc_sockaddr_to_uri(&peer_name);
} else {
@ -368,7 +390,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
}
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
peer_name_string);
sp->server->resource_quota, peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
} else {
@ -466,10 +488,11 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
sockname_temp.len = sizeof(struct sockaddr_storage);
int sockname_temp_len = sizeof(struct sockaddr_storage);
if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)sockname_temp.addr,
&sockname_temp.len)) {
&sockname_temp_len)) {
sockname_temp.len = sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
if (*port > 0) {
allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));

@ -109,14 +109,29 @@ typedef struct grpc_tcp {
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
grpc_resource_user resource_user;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
gpr_atm resource_user_shutdown_count;
char *peer_string;
} grpc_tcp;
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) {
grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
grpc_closure_create(win_unref_closure, tcp));
}
}
static void tcp_free(grpc_tcp *tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
@ -155,6 +170,11 @@ static void tcp_unref(grpc_tcp *tcp) {
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = tcpp;
@ -376,12 +396,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
win_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
win_maybe_shutdown_resource_user(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy");
}
@ -392,6 +414,11 @@ static char *win_get_peer(grpc_endpoint *ep) {
static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
}
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_get_workqueue,
@ -399,18 +426,22 @@ static grpc_endpoint_vtable vtable = {win_read,
win_add_to_pollset_set,
win_shutdown,
win_destroy,
win_get_resource_user,
win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
gpr_ref_init(&tcp->refcount, 2);
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);

@ -50,7 +50,9 @@
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
char *peer_string);
grpc_error *grpc_tcp_prepare_socket(SOCKET sock);

Loading…
Cancel
Save