Merge github.com:grpc/grpc into grpc_slice

reviewable/pr8532/r1
Craig Tiller 8 years ago
commit 1a2c9c1464
  1. 9
      src/core/lib/iomgr/endpoint_pair_windows.c
  2. 30
      src/core/lib/iomgr/tcp_client_windows.c
  3. 37
      src/core/lib/iomgr/tcp_server_windows.c
  4. 35
      src/core/lib/iomgr/tcp_windows.c
  5. 4
      src/core/lib/iomgr/tcp_windows.h

@ -82,15 +82,16 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock; sv[0] = svr_sock;
} }
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
size_t read_slice_size) { const char *name, grpc_resource_quota *resource_quota,
size_t read_slice_size) {
SOCKET sv[2]; SOCKET sv[2];
grpc_endpoint_pair p; grpc_endpoint_pair p;
create_sockets(sv); create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
"endpoint:server"); resource_quota, "endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
"endpoint:client"); resource_quota, "endpoint:client");
return p; return p;
} }

@ -43,6 +43,7 @@
#include <grpc/support/log_windows.h> #include <grpc/support/log_windows.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
@ -61,13 +62,16 @@ typedef struct {
int refs; int refs;
grpc_closure on_connect; grpc_closure on_connect;
grpc_endpoint **endpoint; grpc_endpoint **endpoint;
grpc_resource_quota *resource_quota;
} async_connect; } async_connect;
static void async_connect_unlock_and_cleanup(async_connect *ac, static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
async_connect *ac,
grpc_winsocket *socket) { grpc_winsocket *socket) {
int done = (--ac->refs == 0); int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu); gpr_mu_unlock(&ac->mu);
if (done) { if (done) {
grpc_resource_quota_internal_unref(exec_ctx, ac->resource_quota);
gpr_mu_destroy(&ac->mu); gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name); gpr_free(ac->addr_name);
gpr_free(ac); gpr_free(ac);
@ -83,7 +87,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (socket != NULL) { if (socket != NULL) {
grpc_winsocket_shutdown(socket); grpc_winsocket_shutdown(socket);
} }
async_connect_unlock_and_cleanup(ac, socket); async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
} }
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
@ -113,12 +117,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (!wsa_success) { if (!wsa_success) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else { } else {
*ep = grpc_tcp_create(socket, ac->addr_name); *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
socket = NULL; socket = NULL;
} }
} }
async_connect_unlock_and_cleanup(ac, socket); async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when /* If the connection was aborted, the callback was already called when
the deadline was met. */ the deadline was met. */
grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
@ -129,6 +133,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_endpoint **endpoint, grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties, grpc_pollset_set *interested_parties,
const grpc_channel_args *channel_args,
const grpc_resolved_address *addr, const grpc_resolved_address *addr,
gpr_timespec deadline) { gpr_timespec deadline) {
SOCKET sock = INVALID_SOCKET; SOCKET sock = INVALID_SOCKET;
@ -144,6 +149,17 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_winsocket_callback_info *info; grpc_winsocket_callback_info *info;
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
resource_quota = grpc_resource_quota_internal_ref(
channel_args->args[i].value.pointer.p);
}
}
}
*endpoint = NULL; *endpoint = NULL;
/* Use dualstack sockets where available. */ /* Use dualstack sockets where available. */
@ -177,8 +193,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_sockaddr_make_wildcard6(0, &local_address); grpc_sockaddr_make_wildcard6(0, &local_address);
status = status = bind(sock, (struct sockaddr *)&local_address.addr,
bind(sock, (struct sockaddr *)&local_address.addr, local_address.len); (int)local_address.len);
if (status != 0) { if (status != 0) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind"); error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure; goto failure;
@ -206,6 +222,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->refs = 2; ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr); ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint; ac->endpoint = endpoint;
ac->resource_quota = resource_quota;
grpc_closure_init(&ac->on_connect, on_connect, ac); grpc_closure_init(&ac->on_connect, on_connect, ac);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac, grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
@ -225,6 +242,7 @@ failure:
} else if (sock != INVALID_SOCKET) { } else if (sock != INVALID_SOCKET) {
closesocket(sock); closesocket(sock);
} }
grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
} }

@ -100,14 +100,32 @@ struct grpc_tcp_server {
/* shutdown callback */ /* shutdown callback */
grpc_closure *shutdown_complete; grpc_closure *shutdown_complete;
grpc_resource_quota *resource_quota;
}; };
/* Public function. Allocates the proper data structures to hold a /* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */ grpc_tcp_server. */
grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_closure *shutdown_complete,
const grpc_channel_args *args, const grpc_channel_args *args,
grpc_tcp_server **server) { grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
s->resource_quota =
grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
} else {
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
}
}
}
gpr_ref_init(&s->refs, 1); gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu); gpr_mu_init(&s->mu);
s->active_ports = 0; s->active_ports = 0;
@ -137,6 +155,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_winsocket_destroy(sp->socket); grpc_winsocket_destroy(sp->socket);
gpr_free(sp); gpr_free(sp);
} }
grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s); gpr_free(s);
} }
@ -207,12 +226,13 @@ static grpc_error *prepare_socket(SOCKET sock,
goto failure; goto failure;
} }
sockname_temp.len = sizeof(struct sockaddr_storage); int sockname_temp_len = sizeof(struct sockaddr_storage);
if (getsockname(sock, (struct sockaddr *)sockname_temp.addr, if (getsockname(sock, (struct sockaddr *)sockname_temp.addr,
&sockname_temp.len) == SOCKET_ERROR) { &sockname_temp_len) == SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"); error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
goto failure; goto failure;
} }
sockname_temp.len = sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp); *port = grpc_sockaddr_get_port(&sockname_temp);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
@ -357,8 +377,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message); gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
gpr_free(utf8_message); gpr_free(utf8_message);
} }
int peer_name_len = (int)peer_name.len;
err = err =
getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name.len); getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name_len);
peer_name.len = peer_name_len;
if (!err) { if (!err) {
peer_name_string = grpc_sockaddr_to_uri(&peer_name); peer_name_string = grpc_sockaddr_to_uri(&peer_name);
} else { } else {
@ -368,7 +390,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} }
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
peer_name_string); sp->server->resource_quota, peer_name_string);
gpr_free(fd_name); gpr_free(fd_name);
gpr_free(peer_name_string); gpr_free(peer_name_string);
} else { } else {
@ -466,10 +488,11 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
as some previously created listener. */ as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) { if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) { for (sp = s->head; sp; sp = sp->next) {
sockname_temp.len = sizeof(struct sockaddr_storage); int sockname_temp_len = sizeof(struct sockaddr_storage);
if (0 == getsockname(sp->socket->socket, if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)sockname_temp.addr, (struct sockaddr *)sockname_temp.addr,
&sockname_temp.len)) { &sockname_temp_len)) {
sockname_temp.len = sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp); *port = grpc_sockaddr_get_port(&sockname_temp);
if (*port > 0) { if (*port > 0) {
allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));

@ -109,14 +109,29 @@ typedef struct grpc_tcp {
grpc_slice_buffer *write_slices; grpc_slice_buffer *write_slices;
grpc_slice_buffer *read_slices; grpc_slice_buffer *read_slices;
grpc_resource_user resource_user;
/* The IO Completion Port runs from another thread. We need some mechanism /* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */ to protect ourselves when requesting a shutdown. */
gpr_mu mu; gpr_mu mu;
int shutting_down; int shutting_down;
gpr_atm resource_user_shutdown_count;
char *peer_string; char *peer_string;
} grpc_tcp; } grpc_tcp;
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
grpc_tcp *tcp) {
if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) {
grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
grpc_closure_create(win_unref_closure, tcp));
}
}
static void tcp_free(grpc_tcp *tcp) { static void tcp_free(grpc_tcp *tcp) {
grpc_winsocket_destroy(tcp->socket); grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu); gpr_mu_destroy(&tcp->mu);
@ -155,6 +170,11 @@ static void tcp_unref(grpc_tcp *tcp) {
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif #endif
static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
TCP_UNREF(arg, "resource_user");
}
/* Asynchronous callback from the IOCP, or the background thread. */ /* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = tcpp; grpc_tcp *tcp = tcpp;
@ -376,12 +396,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
callback. See the comments in on_read and on_write. */ callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1; tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket); grpc_winsocket_shutdown(tcp->socket);
win_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_mu_unlock(&tcp->mu); gpr_mu_unlock(&tcp->mu);
} }
static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep); grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep; grpc_tcp *tcp = (grpc_tcp *)ep;
win_maybe_shutdown_resource_user(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy"); TCP_UNREF(tcp, "destroy");
} }
@ -392,6 +414,11 @@ static char *win_get_peer(grpc_endpoint *ep) {
static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return &tcp->resource_user;
}
static grpc_endpoint_vtable vtable = {win_read, static grpc_endpoint_vtable vtable = {win_read,
win_write, win_write,
win_get_workqueue, win_get_workqueue,
@ -399,18 +426,22 @@ static grpc_endpoint_vtable vtable = {win_read,
win_add_to_pollset_set, win_add_to_pollset_set,
win_shutdown, win_shutdown,
win_destroy, win_destroy,
win_get_resource_user,
win_get_peer}; win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable; tcp->base.vtable = &vtable;
tcp->socket = socket; tcp->socket = socket;
gpr_mu_init(&tcp->mu); gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1); gpr_ref_init(&tcp->refcount, 2);
grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string); tcp->peer_string = gpr_strdup(peer_string);
grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */ /* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base); grpc_network_status_register_endpoint(&tcp->base);

@ -50,7 +50,9 @@
/* Create a tcp endpoint given a winsock handle. /* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle. * Takes ownership of the handle.
*/ */
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string); grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
char *peer_string);
grpc_error *grpc_tcp_prepare_socket(SOCKET sock); grpc_error *grpc_tcp_prepare_socket(SOCKET sock);

Loading…
Cancel
Save