Merge pull request #16979 from yashykt/connector_deadlock

Fix deadlock issue in HTTP2 connector
pull/17014/head
Yash Tibrewal 7 years ago committed by GitHub
commit 08ef3bca1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  2. 17
      src/core/lib/iomgr/tcp_client_custom.cc
  3. 27
      test/core/iomgr/tcp_client_uv_test.cc
  4. 40
      test/core/iomgr/tcp_server_uv_test.cc

@ -212,9 +212,17 @@ static void chttp2_connector_connect(grpc_connector* con,
GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
GPR_ASSERT(!c->connecting); GPR_ASSERT(!c->connecting);
c->connecting = true; c->connecting = true;
grpc_tcp_client_connect(&c->connected, &c->endpoint, args->interested_parties, grpc_closure* closure = &c->connected;
args->channel_args, &addr, args->deadline); grpc_endpoint** ep = &c->endpoint;
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
// In some implementations, the closure can be flushed before
// grpc_tcp_client_connect and since the closure requires access to c->mu,
// this can result in a deadlock. Refer
// https://github.com/grpc/grpc/issues/16427
// grpc_tcp_client_connect would fill c->endpoint with proper contents and we
// make sure that we would still exist at that point by taking a ref.
grpc_tcp_client_connect(closure, ep, args->interested_parties,
args->channel_args, &addr, args->deadline);
} }
static const grpc_connector_vtable chttp2_connector_vtable = { static const grpc_connector_vtable chttp2_connector_vtable = {

@ -81,9 +81,8 @@ static void on_alarm(void* acp, grpc_error* error) {
} }
} }
static void custom_connect_callback(grpc_custom_socket* socket, static void custom_connect_callback_internal(grpc_custom_socket* socket,
grpc_error* error) { grpc_error* error) {
grpc_core::ExecCtx exec_ctx;
grpc_custom_tcp_connect* connect = socket->connector; grpc_custom_tcp_connect* connect = socket->connector;
int done; int done;
grpc_closure* closure = connect->closure; grpc_closure* closure = connect->closure;
@ -100,6 +99,18 @@ static void custom_connect_callback(grpc_custom_socket* socket,
GRPC_CLOSURE_SCHED(closure, error); GRPC_CLOSURE_SCHED(closure, error);
} }
static void custom_connect_callback(grpc_custom_socket* socket,
grpc_error* error) {
if (grpc_core::ExecCtx::Get() == nullptr) {
/* If we are being run on a thread which does not have an exec_ctx created
* yet, we should create one. */
grpc_core::ExecCtx exec_ctx;
custom_connect_callback_internal(socket, error);
} else {
custom_connect_callback_internal(socket, error);
}
}
static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args, const grpc_channel_args* channel_args,

@ -129,6 +129,7 @@ void test_succeeds(void) {
uv_close((uv_handle_t*)svr_handle, close_cb); uv_close((uv_handle_t*)svr_handle, close_cb);
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
grpc_core::ExecCtx::Get()->Flush();
} }
void test_fails(void) { void test_fails(void) {
@ -178,6 +179,7 @@ void test_fails(void) {
} }
gpr_mu_unlock(g_mu); gpr_mu_unlock(g_mu);
grpc_core::ExecCtx::Get()->Flush();
} }
static void destroy_pollset(void* p, grpc_error* error) { static void destroy_pollset(void* p, grpc_error* error) {
@ -186,21 +188,22 @@ static void destroy_pollset(void* p, grpc_error* error) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_closure destroyed; grpc_closure destroyed;
grpc_core::ExecCtx exec_ctx;
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init(); grpc_init();
g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size())); {
grpc_pollset_init(g_pollset, &g_mu); grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size()));
test_succeeds(); grpc_pollset_init(g_pollset, &g_mu);
gpr_log(GPR_ERROR, "End of first test");
test_fails(); test_succeeds();
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, gpr_log(GPR_ERROR, "End of first test");
grpc_schedule_on_exec_ctx); test_fails();
grpc_pollset_shutdown(g_pollset, &destroyed); GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(g_pollset, &destroyed);
gpr_free(g_pollset);
}
grpc_shutdown(); grpc_shutdown();
gpr_free(g_pollset);
return 0; return 0;
} }

@ -119,6 +119,7 @@ static void test_no_op(void) {
grpc_tcp_server* s; grpc_tcp_server* s;
GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s)); GPR_ASSERT(GRPC_ERROR_NONE == grpc_tcp_server_create(NULL, NULL, &s));
grpc_tcp_server_unref(s); grpc_tcp_server_unref(s);
grpc_core::ExecCtx::Get()->Flush();
} }
static void test_no_op_with_start(void) { static void test_no_op_with_start(void) {
@ -128,6 +129,7 @@ static void test_no_op_with_start(void) {
LOG_TEST("test_no_op_with_start"); LOG_TEST("test_no_op_with_start");
grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_unref(s); grpc_tcp_server_unref(s);
grpc_core::ExecCtx::Get()->Flush();
} }
static void test_no_op_with_port(void) { static void test_no_op_with_port(void) {
@ -147,6 +149,7 @@ static void test_no_op_with_port(void) {
port > 0); port > 0);
grpc_tcp_server_unref(s); grpc_tcp_server_unref(s);
grpc_core::ExecCtx::Get()->Flush();
} }
static void test_no_op_with_port_and_start(void) { static void test_no_op_with_port_and_start(void) {
@ -168,6 +171,7 @@ static void test_no_op_with_port_and_start(void) {
grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_unref(s); grpc_tcp_server_unref(s);
grpc_core::ExecCtx::Get()->Flush();
} }
static void connect_cb(uv_connect_t* req, int status) { static void connect_cb(uv_connect_t* req, int status) {
@ -273,7 +277,7 @@ static void test_connect(unsigned n) {
GPR_ASSERT(weak_ref.server != NULL); GPR_ASSERT(weak_ref.server != NULL);
grpc_tcp_server_unref(s); grpc_tcp_server_unref(s);
grpc_core::ExecCtx::Get()->Flush();
/* Weak ref lost. */ /* Weak ref lost. */
GPR_ASSERT(weak_ref.server == NULL); GPR_ASSERT(weak_ref.server == NULL);
} }
@ -284,25 +288,27 @@ static void destroy_pollset(void* p, grpc_error* error) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
grpc_closure destroyed; grpc_closure destroyed;
grpc_core::ExecCtx exec_ctx;
grpc_test_init(argc, argv); grpc_test_init(argc, argv);
grpc_init(); grpc_init();
g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size())); {
grpc_pollset_init(g_pollset, &g_mu); grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast<grpc_pollset*>(gpr_malloc(grpc_pollset_size()));
test_no_op(); grpc_pollset_init(g_pollset, &g_mu);
test_no_op_with_start();
test_no_op_with_port(); test_no_op();
test_no_op_with_port_and_start(); test_no_op_with_start();
test_connect(1); test_no_op_with_port();
test_connect(10); test_no_op_with_port_and_start();
test_connect(1);
GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, test_connect(10);
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(g_pollset, &destroyed); GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
grpc_schedule_on_exec_ctx);
grpc_pollset_shutdown(g_pollset, &destroyed);
gpr_free(g_pollset);
}
grpc_shutdown(); grpc_shutdown();
gpr_free(g_pollset);
return 0; return 0;
} }

Loading…
Cancel
Save