diff --git a/src/core/lib/channel/max_age_filter.c b/src/core/lib/channel/max_age_filter.c index 3388b779c8a..4eba057f7c1 100644 --- a/src/core/lib/channel/max_age_filter.c +++ b/src/core/lib/channel/max_age_filter.c @@ -85,6 +85,10 @@ typedef struct channel_data { grpc_closure start_max_age_timer_after_init; // Closure to run when the goaway op is finished and the max_age_timer grpc_closure start_max_age_grace_timer_after_goaway_op; + // Closure to run when the channel connectivity state changes + grpc_closure channel_connectivity_changed; + // Records the current connectivity state + grpc_connectivity_state connectivity_state; // Number of active calls gpr_atm call_count; } channel_data; @@ -97,6 +101,7 @@ static void increase_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer"); grpc_timer_init( exec_ctx, &chand->max_idle_timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_idle), @@ -117,11 +122,17 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, channel_data* chand = arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = true; + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); grpc_timer_init( exec_ctx, &chand->max_age_timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age), &chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC)); gpr_mu_unlock(&chand->max_age_timer_mu); + grpc_transport_op* op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->connectivity_state = &chand->connectivity_state; + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element(chand->channel_stack, 0), op); GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, "max_age start_max_age_timer_after_init"); } @@ -132,6 +143,7 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, channel_data* chand = arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = true; + GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); grpc_timer_init(exec_ctx, &chand->max_age_grace_timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age_grace), @@ -157,6 +169,8 @@ static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, } else if (error != GRPC_ERROR_CANCELLED) { GRPC_LOG_IF_ERROR("close_max_idle_channel", error); } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age max_idle_timer"); } static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, @@ -180,6 +194,8 @@ static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, } else if (error != GRPC_ERROR_CANCELLED) { GRPC_LOG_IF_ERROR("close_max_age_channel", error); } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age max_age_timer"); } static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, @@ -199,6 +215,32 @@ static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, } else if (error != GRPC_ERROR_CANCELLED) { GRPC_LOG_IF_ERROR("force_close_max_age_channel", error); } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, + "max_age max_age_grace_timer"); +} + +static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + channel_data* chand = arg; + if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_transport_op* op = grpc_make_transport_op(NULL); + op->on_connectivity_state_change = &chand->channel_connectivity_changed, + op->connectivity_state = &chand->connectivity_state; + grpc_channel_next_op( + exec_ctx, grpc_channel_stack_element(chand->channel_stack, 0), op); + } else { + gpr_mu_lock(&chand->max_age_timer_mu); + if (chand->max_age_timer_pending) { + grpc_timer_cancel(exec_ctx, &chand->max_age_timer); + chand->max_age_timer_pending = false; + } + if (chand->max_age_grace_timer_pending) { + grpc_timer_cancel(exec_ctx, &chand->max_age_grace_timer); + chand->max_age_grace_timer_pending = false; + } + gpr_mu_unlock(&chand->max_age_timer_mu); + increase_call_count(exec_ctx, chand); + } } // Constructor for call_data. @@ -284,6 +326,9 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_closure_init(&chand->start_max_age_grace_timer_after_goaway_op, start_max_age_grace_timer_after_goaway_op, chand, grpc_schedule_on_exec_ctx); + grpc_closure_init(&chand->channel_connectivity_changed, + channel_connectivity_changed, chand, + grpc_schedule_on_exec_ctx); if (gpr_time_cmp(chand->max_connection_age, gpr_inf_future(GPR_TIMESPAN)) != 0) { @@ -313,18 +358,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) { - channel_data* chand = elem->channel_data; - gpr_mu_lock(&chand->max_age_timer_mu); - if (chand->max_age_timer_pending) { - grpc_timer_cancel(exec_ctx, &chand->max_age_timer); - } - if (chand->max_age_grace_timer_pending) { - grpc_timer_cancel(exec_ctx, &chand->max_age_grace_timer); - } - gpr_mu_unlock(&chand->max_age_timer_mu); - increase_call_count(exec_ctx, chand); -} + grpc_channel_element* elem) {} const grpc_channel_filter grpc_max_age_filter = { grpc_call_next_op, diff --git a/test/core/end2end/tests/max_connection_age.c b/test/core/end2end/tests/max_connection_age.c index 9b31752963a..ee66d125833 100644 --- a/test/core/end2end/tests/max_connection_age.c +++ b/test/core/end2end/tests/max_connection_age.c @@ -196,9 +196,6 @@ static void test_max_age_forcibly_close(grpc_end2end_test_config config) { /* The connection should be closed immediately after the max age grace period, the in-progress RPC should fail. */ GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE); - char *details_str = grpc_slice_to_c_string(details); - gpr_log(GPR_DEBUG, "status: %d, details: %s", status, details_str); - gpr_free(details_str); GPR_ASSERT(0 == grpc_slice_str_cmp(details, "Endpoint read failed")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); validate_host_override_string("foo.test.google.fr:1234", call_details.host,