Merge pull request #18410 from pfreixes/dead_lock_watch_connectivity

Fix watcher connectivity dead lock
pull/19876/head
Vijay Pai 5 years ago committed by GitHub
commit 34673a3e3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      src/core/ext/filters/client_channel/channel_connectivity.cc
  2. 75
      test/core/end2end/tests/connectivity.cc

@ -111,6 +111,12 @@ static void finished_completion(void* pw, grpc_cq_completion* ignored) {
static void partly_done(state_watcher* w, bool due_to_completion,
grpc_error* error) {
bool end_op = false;
void* end_op_tag = nullptr;
grpc_error* end_op_error = nullptr;
grpc_completion_queue* end_op_cq = nullptr;
grpc_cq_completion* end_op_completion_storage = nullptr;
if (due_to_completion) {
grpc_timer_cancel(&w->alarm);
} else {
@ -152,8 +158,11 @@ static void partly_done(state_watcher* w, bool due_to_completion,
w->error = error;
}
w->phase = CALLING_BACK_AND_FINISHED;
grpc_cq_end_op(w->cq, w->tag, w->error, finished_completion, w,
&w->completion_storage);
end_op = true;
end_op_cq = w->cq;
end_op_tag = w->tag;
end_op_error = w->error;
end_op_completion_storage = &w->completion_storage;
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
@ -161,6 +170,11 @@ static void partly_done(state_watcher* w, bool due_to_completion,
}
gpr_mu_unlock(&w->mu);
if (end_op) {
grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, finished_completion, w,
end_op_completion_storage);
}
GRPC_ERROR_UNREF(error);
}

@ -33,6 +33,16 @@ typedef struct {
grpc_completion_queue* cq;
} child_events;
struct CallbackContext {
grpc_experimental_completion_queue_functor functor;
gpr_event finished;
explicit CallbackContext(void (*cb)(
grpc_experimental_completion_queue_functor* functor, int success)) {
functor.functor_run = cb;
gpr_event_init(&finished);
}
};
static void child_thread(void* arg) {
child_events* ce = static_cast<child_events*>(arg);
grpc_event ev;
@ -163,9 +173,74 @@ static void test_connectivity(grpc_end2end_test_config config) {
cq_verifier_destroy(cqv);
}
static void cb_watch_connectivity(
grpc_experimental_completion_queue_functor* functor, int success) {
CallbackContext* cb_ctx = (CallbackContext*)functor;
gpr_log(GPR_DEBUG, "cb_watch_connectivity called, verifying");
/* callback must not have errors */
GPR_ASSERT(success != 0);
gpr_event_set(&cb_ctx->finished, (void*)1);
}
static void cb_shutdown(grpc_experimental_completion_queue_functor* functor,
int success) {
CallbackContext* cb_ctx = (CallbackContext*)functor;
gpr_log(GPR_DEBUG, "cb_shutdown called, nothing to do");
gpr_event_set(&cb_ctx->finished, (void*)1);
}
static void test_watch_connectivity_cq_callback(
grpc_end2end_test_config config) {
CallbackContext cb_ctx(cb_watch_connectivity);
CallbackContext cb_shutdown_ctx(cb_shutdown);
grpc_completion_queue* cq;
grpc_end2end_test_fixture f = config.create_fixture(nullptr, nullptr);
config.init_client(&f, nullptr);
/* start connecting */
grpc_channel_check_connectivity_state(f.client, 1);
/* create the cq callback */
cq = grpc_completion_queue_create_for_callback(&cb_shutdown_ctx.functor,
nullptr);
/* start watching for any change, cb is immediately called
* and no dead lock should be raised */
grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
grpc_timeout_seconds_to_deadline(3), cq,
&cb_ctx.functor);
/* we just check that the callback was executed once notifying a connection
* transition */
GPR_ASSERT(gpr_event_wait(&cb_ctx.finished,
gpr_inf_future(GPR_CLOCK_MONOTONIC)) != nullptr);
/* shutdown, since shutdown cb might be executed in a background thread
* we actively wait till is executed. */
grpc_completion_queue_shutdown(cq);
gpr_event_wait(&cb_shutdown_ctx.finished,
gpr_inf_future(GPR_CLOCK_MONOTONIC));
/* cleanup */
grpc_channel_destroy(f.client);
grpc_completion_queue_destroy(cq);
/* shutdown_cq and cq are not used in this test */
grpc_completion_queue_destroy(f.cq);
grpc_completion_queue_destroy(f.shutdown_cq);
config.tear_down_data(&f);
}
void connectivity(grpc_end2end_test_config config) {
GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION);
test_connectivity(config);
test_watch_connectivity_cq_callback(config);
}
void connectivity_pre_init(void) {}

Loading…
Cancel
Save