Merge pull request #8962 from ctiller/fixit29

Fix race if connector is shutdown while connecting
pull/8969/head
Craig Tiller 8 years ago committed by GitHub
commit dc74a4d58b
  1. 10
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  2. 4
      src/core/lib/surface/call.c
  3. 12
      src/cpp/server/server_cc.cc

@ -56,6 +56,7 @@ typedef struct {
gpr_refcount refs; gpr_refcount refs;
bool shutdown; bool shutdown;
bool connecting;
char *server_name; char *server_name;
grpc_chttp2_create_handshakers_func create_handshakers; grpc_chttp2_create_handshakers_func create_handshakers;
@ -103,7 +104,9 @@ static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
} }
// If handshaking is not yet in progress, shutdown the endpoint. // If handshaking is not yet in progress, shutdown the endpoint.
// Otherwise, the handshaker will do this for us. // Otherwise, the handshaker will do this for us.
if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); if (!c->connecting && c->endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint);
}
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
} }
@ -192,6 +195,8 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
chttp2_connector *c = arg; chttp2_connector *c = arg;
gpr_mu_lock(&c->mu); gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting);
c->connecting = false;
if (error != GRPC_ERROR_NONE || c->shutdown) { if (error != GRPC_ERROR_NONE || c->shutdown) {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE("connector shutdown"); error = GRPC_ERROR_CREATE("connector shutdown");
@ -202,6 +207,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_closure *notify = c->notify; grpc_closure *notify = c->notify;
c->notify = NULL; c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg); chttp2_connector_unref(exec_ctx, arg);
} else { } else {
@ -235,6 +241,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(c->endpoint == NULL); GPR_ASSERT(c->endpoint == NULL);
chttp2_connector_ref(con); // Ref taken for callback. chttp2_connector_ref(con); // Ref taken for callback.
grpc_closure_init(&c->connected, connected, c); grpc_closure_init(&c->connected, connected, c);
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
args->interested_parties, args->channel_args, args->interested_parties, args->channel_args,
args->addr, args->deadline); args->addr, args->deadline);

@ -1551,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error; goto done_with_error;
} }
/* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
from server.c. In that case, it's coming from accept_stream, and in
that case we're not necessarily covered by a poller. */
stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = 1; call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata; call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready, grpc_closure_init(&call->receiving_initial_metadata_ready,

@ -510,12 +510,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
ShutdownTag shutdown_tag; // Dummy shutdown tag ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag); grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
shutdown_cq.Shutdown(); shutdown_cq.Shutdown();
void* tag; void* tag;
@ -531,6 +525,12 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown // successfully shutdown
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// Wait for threads in all ThreadManagers to terminate // Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait(); (*it)->Wait();

Loading…
Cancel
Save