Pass ownership of grpc_tcp_server_acceptor to connector.

pull/8979/head
Mark D. Roth 8 years ago
parent 5b850b2194
commit eed3815e9e
  1. 1
      src/core/ext/transport/chttp2/server/chttp2_server.c
  2. 3
      src/core/lib/iomgr/tcp_server.h
  3. 16
      src/core/lib/iomgr/tcp_server_posix.c
  4. 10
      src/core/lib/iomgr/tcp_server_uv.c
  5. 9
      src/core/lib/iomgr/tcp_server_windows.c

@ -166,6 +166,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&connection_state->server_state->mu); gpr_mu_unlock(&connection_state->server_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
gpr_free(connection_state->acceptor);
gpr_free(connection_state); gpr_free(connection_state);
} }

@ -52,7 +52,8 @@ typedef struct grpc_tcp_server_acceptor {
unsigned fd_index; unsigned fd_index;
} grpc_tcp_server_acceptor; } grpc_tcp_server_acceptor;
/* Called for newly connected TCP connections. */ /* Called for newly connected TCP connections.
Takes ownership of acceptor. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep, grpc_endpoint *ep,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,

@ -381,16 +381,18 @@ error:
/* event manager callback when reads are ready */ /* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg; grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
goto error; goto error;
} }
read_notifier_pollset = // Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = sp->fd_index;
grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add( sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) % &sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count]; sp->server->pollset_count];
@ -426,7 +428,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
} }
fdobj = grpc_fd_create(fd, name); grpc_fd *fdobj = grpc_fd_create(fd, name);
if (read_notifier_pollset == NULL) { if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
@ -439,7 +441,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
exec_ctx, sp->server->on_accept_cb_arg, exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota, grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
read_notifier_pollset, &acceptor); read_notifier_pollset, acceptor);
gpr_free(name); gpr_free(name);
gpr_free(addr_str); gpr_free(addr_str);

@ -188,7 +188,6 @@ static void accepted_connection_close_cb(uv_handle_t *handle) {
static void on_connect(uv_stream_t *server, int status) { static void on_connect(uv_stream_t *server, int status) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
uv_tcp_t *client; uv_tcp_t *client;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -201,6 +200,13 @@ static void on_connect(uv_stream_t *server, int status) {
uv_strerror(status)); uv_strerror(status));
return; return;
} }
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
client = gpr_malloc(sizeof(uv_tcp_t)); client = gpr_malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client); uv_tcp_init(uv_default_loop(), client);
// UV documentation says this is guaranteed to succeed // UV documentation says this is guaranteed to succeed
@ -221,7 +227,7 @@ static void on_connect(uv_stream_t *server, int status) {
} }
ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor); acceptor);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
} }

@ -323,7 +323,6 @@ failure:
/* Event manager callback when reads are ready. */ /* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg; grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket; SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
@ -350,6 +349,12 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
return; return;
} }
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
/* The IOCP notified us of a completed operation. Let's grab the results, /* The IOCP notified us of a completed operation. Let's grab the results,
and act accordingly. */ and act accordingly. */
transfered_bytes = 0; transfered_bytes = 0;
@ -397,7 +402,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
managed to accept a connection, and created an endpoint. */ managed to accept a connection, and created an endpoint. */
if (ep) { if (ep) {
sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor); acceptor);
} }
/* As we were notified from the IOCP of one and exactly one accept, /* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned the former socked we created has now either been destroy or assigned

Loading…
Cancel
Save