diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 2662d8466b6..6091184ec94 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -47,12 +47,14 @@ HealthCheckClient::HealthCheckClient( const char* service_name, RefCountedPtr connected_subchannel, grpc_pollset_set* interested_parties, - RefCountedPtr channelz_node) + RefCountedPtr channelz_node, + RefCountedPtr watcher) : InternallyRefCounted(&grpc_health_check_client_trace), service_name_(service_name), connected_subchannel_(std::move(connected_subchannel)), interested_parties_(interested_parties), channelz_node_(std::move(channelz_node)), + watcher_(std::move(watcher)), retry_backoff_( BackOff::Options() .set_initial_backoff( @@ -73,43 +75,21 @@ HealthCheckClient::~HealthCheckClient() { if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this); } - GRPC_ERROR_UNREF(error_); -} - -void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state, - grpc_closure* closure) { - MutexLock lock(&mu_); - GPR_ASSERT(notify_state_ == nullptr); - if (*state != state_) { - *state = state_; - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error_)); - return; - } - notify_state_ = state; - on_health_changed_ = closure; } void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state, - grpc_error* error) { + const char* reason) { MutexLock lock(&mu_); - SetHealthStatusLocked(state, error); + SetHealthStatusLocked(state, reason); } void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state, - grpc_error* error) { + const char* reason) { if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { - gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%d error=%s", this, - state, grpc_error_string(error)); - } - if (notify_state_ != nullptr && *notify_state_ != state) { - *notify_state_ = state; - notify_state_ = nullptr; - GRPC_CLOSURE_SCHED(on_health_changed_, GRPC_ERROR_REF(error)); - on_health_changed_ = nullptr; + gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this, + ConnectivityStateName(state), reason); } - state_ = state; - GRPC_ERROR_UNREF(error_); - error_ = error; + if (watcher_ != nullptr) watcher_->Notify(state); } void HealthCheckClient::Orphan() { @@ -118,13 +98,8 @@ void HealthCheckClient::Orphan() { } { MutexLock lock(&mu_); - if (on_health_changed_ != nullptr) { - *notify_state_ = GRPC_CHANNEL_SHUTDOWN; - notify_state_ = nullptr; - GRPC_CLOSURE_SCHED(on_health_changed_, GRPC_ERROR_NONE); - on_health_changed_ = nullptr; - } shutting_down_ = true; + watcher_.reset(); call_state_.reset(); if (retry_timer_callback_pending_) { grpc_timer_cancel(&retry_timer_); @@ -141,7 +116,7 @@ void HealthCheckClient::StartCall() { void HealthCheckClient::StartCallLocked() { if (shutting_down_) return; GPR_ASSERT(call_state_ == nullptr); - SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE); + SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch"); call_state_ = MakeOrphanable(Ref(), interested_parties_); if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this, @@ -152,10 +127,8 @@ void HealthCheckClient::StartCallLocked() { void HealthCheckClient::StartRetryTimer() { MutexLock lock(&mu_); - SetHealthStatusLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "health check call failed; will retry after backoff")); + SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, + "health check call failed; will retry after backoff"); grpc_millis next_try = retry_backoff_.NextAttemptTime(); if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) { gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this); @@ -489,10 +462,10 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) { const bool healthy = DecodeResponse(&recv_message_buffer_, &error); const grpc_connectivity_state state = healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE; - if (error == GRPC_ERROR_NONE && !healthy) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("backend unhealthy"); - } - health_check_client_->SetHealthStatus(state, error); + const char* reason = error == GRPC_ERROR_NONE && !healthy + ? "backend unhealthy" + : grpc_error_string(error); + health_check_client_->SetHealthStatus(state, reason); seen_response_.Store(true, MemoryOrder::RELEASE); grpc_slice_buffer_destroy_internal(&recv_message_buffer_); // Start another recv_message batch. @@ -603,7 +576,7 @@ void HealthCheckClient::CallState::RecvTrailingMetadataReady( grpc_slice_from_static_string(kErrorMessage)); } self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY, - GRPC_ERROR_NONE); + kErrorMessage); retry = false; } self->CallEnded(retry); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 956c1095550..f8b9ade5ab6 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -47,16 +47,11 @@ class HealthCheckClient : public InternallyRefCounted { HealthCheckClient(const char* service_name, RefCountedPtr connected_subchannel, grpc_pollset_set* interested_parties, - RefCountedPtr channelz_node); + RefCountedPtr channelz_node, + RefCountedPtr watcher); ~HealthCheckClient(); - // When the health state changes from *state, sets *state to the new - // value and schedules closure. - // Only one closure can be outstanding at a time. - void NotifyOnHealthChange(grpc_connectivity_state* state, - grpc_closure* closure); - void Orphan() override; private: @@ -151,9 +146,9 @@ class HealthCheckClient : public InternallyRefCounted { void StartRetryTimer(); static void OnRetryTimer(void* arg, grpc_error* error); - void SetHealthStatus(grpc_connectivity_state state, grpc_error* error); + void SetHealthStatus(grpc_connectivity_state state, const char* reason); void SetHealthStatusLocked(grpc_connectivity_state state, - grpc_error* error); // Requires holding mu_. + const char* reason); // Requires holding mu_. const char* service_name_; // Do not own. RefCountedPtr connected_subchannel_; @@ -161,10 +156,7 @@ class HealthCheckClient : public InternallyRefCounted { RefCountedPtr channelz_node_; Mutex mu_; - grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; - grpc_error* error_ = GRPC_ERROR_NONE; - grpc_connectivity_state* notify_state_ = nullptr; - grpc_closure* on_health_changed_ = nullptr; + RefCountedPtr watcher_; bool shutting_down_ = false; // The data associated with the current health check call. It holds a ref diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 2c3f899d2a7..b762349eb62 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -401,7 +401,7 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked( // State needed for tracking the connectivity state with a particular // health check service name. class Subchannel::HealthWatcherMap::HealthWatcher - : public InternallyRefCounted { + : public AsyncConnectivityStateWatcherInterface { public: HealthWatcher(Subchannel* c, UniquePtr health_check_service_name, grpc_connectivity_state subchannel_state) @@ -410,8 +410,6 @@ class Subchannel::HealthWatcherMap::HealthWatcher state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING : subchannel_state) { GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher"); - GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, - grpc_schedule_on_exec_ctx); // If the subchannel is already connected, start health checking. if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked(); } @@ -428,7 +426,7 @@ class Subchannel::HealthWatcherMap::HealthWatcher void AddWatcherLocked( grpc_connectivity_state initial_state, - OrphanablePtr watcher) { + OrphanablePtr watcher) { if (state_ != initial_state) { RefCountedPtr connected_subchannel; if (state_ == GRPC_CHANNEL_READY) { @@ -440,7 +438,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher watcher_list_.AddWatcherLocked(std::move(watcher)); } - void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher) { + void RemoveWatcherLocked( + Subchannel::ConnectivityStateWatcherInterface* watcher) { watcher_list_.RemoveWatcherLocked(watcher); } @@ -473,38 +472,24 @@ class Subchannel::HealthWatcherMap::HealthWatcher } private: + void OnConnectivityStateChange(grpc_connectivity_state new_state) override { + MutexLock lock(&subchannel_->mu_); + if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) { + state_ = new_state; + watcher_list_.NotifyLocked(subchannel_, new_state); + } + } + void StartHealthCheckingLocked() { GPR_ASSERT(health_check_client_ == nullptr); health_check_client_ = MakeOrphanable( health_check_service_name_.get(), subchannel_->connected_subchannel_, - subchannel_->pollset_set_, subchannel_->channelz_node_); - Ref().release(); // Ref for health callback tracked manually. - health_check_client_->NotifyOnHealthChange(&state_, &on_health_changed_); - } - - static void OnHealthChanged(void* arg, grpc_error* error) { - auto* self = static_cast(arg); - Subchannel* c = self->subchannel_; - { - MutexLock lock(&c->mu_); - if (self->state_ != GRPC_CHANNEL_SHUTDOWN && - self->health_check_client_ != nullptr) { - self->watcher_list_.NotifyLocked(c, self->state_); - // Renew watch. - self->health_check_client_->NotifyOnHealthChange( - &self->state_, &self->on_health_changed_); - return; // So we don't unref below. - } - } - // Don't unref until we've released the lock, because this might - // cause the subchannel (which contains the lock) to be destroyed. - self->Unref(); + subchannel_->pollset_set_, subchannel_->channelz_node_, Ref()); } Subchannel* subchannel_; UniquePtr health_check_service_name_; OrphanablePtr health_check_client_; - grpc_closure on_health_changed_; grpc_connectivity_state state_; ConnectivityStateWatcherList watcher_list_; }; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 5a657e15b5f..8c3ef368d58 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -1471,6 +1471,34 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthChecking) { EnableDefaultHealthCheckService(false); } +TEST_F(ClientLbEnd2endTest, + RoundRobinWithHealthCheckingHandlesSubchannelFailure) { + EnableDefaultHealthCheckService(true); + // Start servers. + const int kNumServers = 3; + StartServers(kNumServers); + servers_[0]->SetServingStatus("health_check_service_name", true); + servers_[1]->SetServingStatus("health_check_service_name", true); + servers_[2]->SetServingStatus("health_check_service_name", true); + ChannelArguments args; + args.SetServiceConfigJSON( + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name\"}}"); + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator, args); + auto stub = BuildStub(channel); + response_generator.SetNextResolution(GetServersPorts()); + WaitForServer(stub, 0, DEBUG_LOCATION); + // Stop server 0 and send a new resolver result to ensure that RR + // checks each subchannel's state. + servers_[0]->Shutdown(); + response_generator.SetNextResolution(GetServersPorts()); + // Send a bunch more RPCs. + for (size_t i = 0; i < 100; i++) { + SendRpc(stub); + } +} + TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { EnableDefaultHealthCheckService(true); // Start server.