diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index 232183d61ff..58170e66f96 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -111,6 +111,12 @@ static void finished_completion(void* pw, grpc_cq_completion* ignored) { static void partly_done(state_watcher* w, bool due_to_completion, grpc_error* error) { + bool end_op = false; + void* end_op_tag = nullptr; + grpc_error* end_op_error = nullptr; + grpc_completion_queue* end_op_cq = nullptr; + grpc_cq_completion* end_op_completion_storage = nullptr; + if (due_to_completion) { grpc_timer_cancel(&w->alarm); } else { @@ -152,8 +158,11 @@ static void partly_done(state_watcher* w, bool due_to_completion, w->error = error; } w->phase = CALLING_BACK_AND_FINISHED; - grpc_cq_end_op(w->cq, w->tag, w->error, finished_completion, w, - &w->completion_storage); + end_op = true; + end_op_cq = w->cq; + end_op_tag = w->tag; + end_op_error = w->error; + end_op_completion_storage = &w->completion_storage; break; case CALLING_BACK_AND_FINISHED: GPR_UNREACHABLE_CODE(return ); @@ -161,6 +170,11 @@ static void partly_done(state_watcher* w, bool due_to_completion, } gpr_mu_unlock(&w->mu); + if (end_op) { + grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, finished_completion, w, + end_op_completion_storage); + } + GRPC_ERROR_UNREF(error); } diff --git a/test/core/end2end/tests/connectivity.cc b/test/core/end2end/tests/connectivity.cc index caa4265aa2c..5511e55ccc7 100644 --- a/test/core/end2end/tests/connectivity.cc +++ b/test/core/end2end/tests/connectivity.cc @@ -33,6 +33,16 @@ typedef struct { grpc_completion_queue* cq; } child_events; +struct CallbackContext { + grpc_experimental_completion_queue_functor functor; + gpr_event finished; + explicit CallbackContext(void (*cb)( + grpc_experimental_completion_queue_functor* functor, int success)) { + functor.functor_run = cb; + gpr_event_init(&finished); + } +}; + static void child_thread(void* arg) { child_events* ce = static_cast(arg); grpc_event ev; @@ -163,9 +173,74 @@ static void test_connectivity(grpc_end2end_test_config config) { cq_verifier_destroy(cqv); } +static void cb_watch_connectivity( + grpc_experimental_completion_queue_functor* functor, int success) { + CallbackContext* cb_ctx = (CallbackContext*)functor; + + gpr_log(GPR_DEBUG, "cb_watch_connectivity called, verifying"); + + /* callback must not have errors */ + GPR_ASSERT(success != 0); + + gpr_event_set(&cb_ctx->finished, (void*)1); +} + +static void cb_shutdown(grpc_experimental_completion_queue_functor* functor, + int success) { + CallbackContext* cb_ctx = (CallbackContext*)functor; + + gpr_log(GPR_DEBUG, "cb_shutdown called, nothing to do"); + gpr_event_set(&cb_ctx->finished, (void*)1); +} + +static void test_watch_connectivity_cq_callback( + grpc_end2end_test_config config) { + CallbackContext cb_ctx(cb_watch_connectivity); + CallbackContext cb_shutdown_ctx(cb_shutdown); + grpc_completion_queue* cq; + grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr); + + config.init_client(&f, nullptr); + + /* start connecting */ + grpc_channel_check_connectivity_state(f.client, 1); + + /* create the cq callback */ + cq = grpc_completion_queue_create_for_callback(&cb_shutdown_ctx.functor, + nullptr); + + /* start watching for any change, cb is immediately called + * and no dead lock should be raised */ + grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE, + grpc_timeout_seconds_to_deadline(3), cq, + &cb_ctx.functor); + + /* we just check that the callback was executed once notifying a connection + * transition */ + GPR_ASSERT(gpr_event_wait(&cb_ctx.finished, + gpr_inf_future(GPR_CLOCK_MONOTONIC)) != nullptr); + + /* shutdown, since shutdown cb might be executed in a background thread + * we actively wait till is executed. */ + grpc_completion_queue_shutdown(cq); + gpr_event_wait(&cb_shutdown_ctx.finished, + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + + /* cleanup */ + grpc_channel_destroy(f.client); + grpc_completion_queue_destroy(cq); + + /* shutdown_cq and cq are not used in this test */ + grpc_completion_queue_destroy(f.cq); + grpc_completion_queue_destroy(f.shutdown_cq); + + config.tear_down_data(&f); +} + void connectivity(grpc_end2end_test_config config) { GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION); test_connectivity(config); + test_watch_connectivity_cq_callback(config); } void connectivity_pre_init(void) {}