From 6d8ca69ed3d9bbafc0e3d3f1752b0119b2676e30 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 11 May 2017 14:29:34 -0700 Subject: [PATCH 1/2] Fixes to subchannel and its index --- src/core/ext/filters/client_channel/subchannel.c | 2 +- .../ext/filters/client_channel/subchannel_index.c | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 1af3393a62c..dd14bf1d027 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -283,6 +283,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; + // add a weak ref and subtract a strong ref (atomically) old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { @@ -656,7 +657,6 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, gpr_free(sw_subchannel); grpc_channel_stack_destroy(exec_ctx, stk); gpr_free(con); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); return false; } diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index f6ef4a845e8..b25dbfcf519 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -183,8 +183,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, enter_ctx(exec_ctx); grpc_subchannel *c = NULL; + bool need_to_unref_constructed; while (c == NULL) { + need_to_unref_constructed = false; + // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); @@ -193,9 +196,12 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, // - Check to see if a subchannel already exists c = gpr_avl_get(index, key); + if (c != NULL) { + c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); + } if (c != NULL) { // yes -> we're done - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register"); + need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap gpr_avl updated = @@ -219,6 +225,10 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, leave_ctx(exec_ctx); + if (need_to_unref_constructed) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, constructed, "index_register"); + } + return c; } From 83b34e524f5717d68daca0a3e3f6528441af37f6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 17 May 2017 22:20:05 +0000 Subject: [PATCH 2/2] Fix some races in tests --- .../end2end/fixtures/http_proxy_fixture.c | 19 ++++++++++++------- .../surface/concurrent_connectivity_test.c | 2 ++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c index c2d8480e69a..708409d8658 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.c +++ b/test/core/end2end/fixtures/http_proxy_fixture.c @@ -60,6 +60,7 @@ #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/util/port.h" @@ -71,6 +72,8 @@ struct grpc_end2end_http_proxy { gpr_mu* mu; grpc_pollset* pollset; gpr_refcount users; + + grpc_combiner *combiner; }; // @@ -400,19 +403,19 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset); grpc_endpoint_add_to_pollset_set(exec_ctx, endpoint, conn->pollset_set); grpc_closure_init(&conn->on_read_request_done, on_read_request_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_closure_init(&conn->on_server_connect_done, on_server_connect_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_closure_init(&conn->on_write_response_done, on_write_response_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_closure_init(&conn->on_client_read_done, on_client_read_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_closure_init(&conn->on_client_write_done, on_client_write_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_closure_init(&conn->on_server_read_done, on_server_read_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_closure_init(&conn->on_server_write_done, on_server_write_done, conn, - grpc_schedule_on_exec_ctx); + grpc_combiner_scheduler(conn->proxy->combiner, false)); grpc_slice_buffer_init(&conn->client_read_buffer); grpc_slice_buffer_init(&conn->client_deferred_write_buffer); grpc_slice_buffer_init(&conn->client_write_buffer); @@ -453,6 +456,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) { grpc_end2end_http_proxy* proxy = (grpc_end2end_http_proxy*)gpr_malloc(sizeof(*proxy)); memset(proxy, 0, sizeof(*proxy)); + proxy->combiner = grpc_combiner_create(NULL); gpr_ref_init(&proxy->users, 1); // Construct proxy address. const int proxy_port = grpc_pick_unused_port_or_die(); @@ -504,6 +508,7 @@ void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { grpc_pollset_shutdown(&exec_ctx, proxy->pollset, grpc_closure_create(destroy_pollset, proxy->pollset, grpc_schedule_on_exec_ctx)); + grpc_combiner_unref(&exec_ctx, proxy->combiner); gpr_free(proxy); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index f0e3394b2e7..87ad095170f 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -112,7 +112,9 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected")); grpc_endpoint_destroy(exec_ctx, tcp); + gpr_mu_lock(args->mu); GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); + gpr_mu_unlock(args->mu); } void bad_server_thread(void *vargs) {