From c8430023a5c137289905e66191897ed88d7ea3e0 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 28 Aug 2019 09:05:59 -0700 Subject: [PATCH] Apply health check service name changes to existing subchannels. --- .../filters/client_channel/client_channel.cc | 90 ++++++++++++++++--- test/cpp/end2end/client_lb_end2end_test.cc | 43 ++++++++- 2 files changed, 119 insertions(+), 14 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index db5ddcb46b9..00d21987777 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -273,6 +273,12 @@ class ChannelData { bool received_first_resolver_result_ = false; // The number of SubchannelWrapper instances referencing a given Subchannel. Map subchannel_refcount_map_; + // The set of SubchannelWrappers that currently exist. + // No need to hold a ref, since the map is updated in the control-plane + // combiner when the SubchannelWrappers are created and destroyed. + // TODO(roth): We really want to use a set here, not a map. Since we don't + // currently have a set implementation, we use a map and ignore the value. + Map subchannel_wrappers_; // Pending ConnectedSubchannel updates for each SubchannelWrapper. // Updates are queued here in the control plane combiner and then applied // in the data plane combiner when the picker is updated. @@ -799,14 +805,14 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper"); auto* subchannel_node = subchannel_->channelz_node(); if (subchannel_node != nullptr) { - intptr_t subchannel_uuid = subchannel_node->uuid(); auto it = chand_->subchannel_refcount_map_.find(subchannel_); if (it == chand_->subchannel_refcount_map_.end()) { - chand_->channelz_node_->AddChildSubchannel(subchannel_uuid); + chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid()); it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first; } ++it->second; } + chand_->subchannel_wrappers_[this] = true; } ~SubchannelWrapper() { @@ -815,14 +821,14 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { "chand=%p: destroying subchannel wrapper %p for subchannel %p", chand_, this, subchannel_); } + chand_->subchannel_wrappers_.erase(this); auto* subchannel_node = subchannel_->channelz_node(); if (subchannel_node != nullptr) { - intptr_t subchannel_uuid = subchannel_node->uuid(); auto it = chand_->subchannel_refcount_map_.find(subchannel_); GPR_ASSERT(it != chand_->subchannel_refcount_map_.end()); --it->second; if (it->second == 0) { - chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid); + chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid()); chand_->subchannel_refcount_map_.erase(it); } } @@ -844,8 +850,9 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { UniquePtr watcher) override { auto& watcher_wrapper = watcher_map_[watcher.get()]; GPR_ASSERT(watcher_wrapper == nullptr); - watcher_wrapper = New( - std::move(watcher), Ref(DEBUG_LOCATION, "WatcherWrapper")); + watcher_wrapper = New(std::move(watcher), + Ref(DEBUG_LOCATION, "WatcherWrapper"), + initial_state); subchannel_->WatchConnectivityState( initial_state, UniquePtr(gpr_strdup(health_check_service_name_.get())), @@ -870,6 +877,40 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { return subchannel_->channel_args(); } + void UpdateHealthCheckServiceName(UniquePtr health_check_service_name) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { + gpr_log(GPR_INFO, + "chand=%p: subchannel wrapper %p: updating health check service " + "name from \"%s\" to \"%s\"", + chand_, this, health_check_service_name_.get(), + health_check_service_name.get()); + } + for (auto& p : watcher_map_) { + WatcherWrapper*& watcher_wrapper = p.second; + // Cancel the current watcher and create a new one using the new + // health check service name. + // TODO(roth): If there is not already an existing health watch + // call for the new name, then the watcher will initially report + // state CONNECTING. If the LB policy is currently reporting + // state READY, this may cause it to switch to CONNECTING before + // switching back to READY. This could cause a small delay for + // RPCs being started on the channel. If/when this becomes a + // problem, we may be able to handle it by waiting for the new + // watcher to report READY before we use it to replace the old one. + WatcherWrapper* replacement = watcher_wrapper->MakeReplacement(); + subchannel_->CancelConnectivityStateWatch( + health_check_service_name_.get(), watcher_wrapper); + watcher_wrapper = replacement; + subchannel_->WatchConnectivityState( + replacement->last_seen_state(), + UniquePtr(gpr_strdup(health_check_service_name.get())), + OrphanablePtr( + replacement)); + } + // Save the new health check service name. + health_check_service_name_ = std::move(health_check_service_name); + } + // Caller must be holding the control-plane combiner. ConnectedSubchannel* connected_subchannel() const { return connected_subchannel_.get(); @@ -904,8 +945,11 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { WatcherWrapper( UniquePtr watcher, - RefCountedPtr parent) - : watcher_(std::move(watcher)), parent_(std::move(parent)) {} + RefCountedPtr parent, + grpc_connectivity_state initial_state) + : watcher_(std::move(watcher)), + parent_(std::move(parent)), + last_seen_state_(initial_state) {} ~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); } @@ -928,9 +972,21 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { } grpc_pollset_set* interested_parties() override { - return watcher_->interested_parties(); + SubchannelInterface::ConnectivityStateWatcherInterface* watcher = + watcher_.get(); + if (watcher_ == nullptr) watcher = replacement_->watcher_.get(); + return watcher->interested_parties(); } + WatcherWrapper* MakeReplacement() { + auto* replacement = + New(std::move(watcher_), parent_, last_seen_state_); + replacement_ = replacement; + return replacement; + } + + grpc_connectivity_state last_seen_state() const { return last_seen_state_; } + private: class Updater { public: @@ -954,12 +1010,17 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { gpr_log(GPR_INFO, "chand=%p: processing connectivity change in combiner " "for subchannel wrapper %p subchannel %p " - "(connected_subchannel=%p state=%s)", + "(connected_subchannel=%p state=%s): watcher=%p", self->parent_->parent_->chand_, self->parent_->parent_.get(), self->parent_->parent_->subchannel_, self->connected_subchannel_.get(), - grpc_connectivity_state_name(self->state_)); + grpc_connectivity_state_name(self->state_), + self->parent_->watcher_.get()); } + // Ignore update if the parent WatcherWrapper has been replaced + // since this callback was scheduled. + if (self->parent_->watcher_ == nullptr) return; + self->parent_->last_seen_state_ = self->state_; self->parent_->parent_->MaybeUpdateConnectedSubchannel( std::move(self->connected_subchannel_)); self->parent_->watcher_->OnConnectivityStateChange(self->state_); @@ -974,6 +1035,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { UniquePtr watcher_; RefCountedPtr parent_; + grpc_connectivity_state last_seen_state_; + WatcherWrapper* replacement_ = nullptr; }; void MaybeUpdateConnectedSubchannel( @@ -1655,6 +1718,11 @@ bool ChannelData::ProcessResolverResultLocked( } else { chand->health_check_service_name_.reset(); } + // Update health check service name used by existing subchannel wrappers. + for (const auto& p : chand->subchannel_wrappers_) { + p.first->UpdateHealthCheckServiceName( + UniquePtr(gpr_strdup(chand->health_check_service_name_.get()))); + } // Save service config. chand->saved_service_config_ = std::move(service_config); } diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index ceab7506729..a0af46570d4 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -42,6 +42,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/server_address.h" +#include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/debug_location.h" @@ -144,9 +145,11 @@ class FakeResolverResponseGeneratorWrapper { response_generator_ = std::move(other.response_generator_); } - void SetNextResolution(const std::vector& ports) { + void SetNextResolution(const std::vector& ports, + const char* service_config_json = nullptr) { grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponse(BuildFakeResults(ports)); + response_generator_->SetResponse( + BuildFakeResults(ports, service_config_json)); } void SetNextResolutionUponError(const std::vector& ports) { @@ -165,7 +168,8 @@ class FakeResolverResponseGeneratorWrapper { private: static grpc_core::Resolver::Result BuildFakeResults( - const std::vector& ports) { + const std::vector& ports, + const char* service_config_json = nullptr) { grpc_core::Resolver::Result result; for (const int& port : ports) { char* lb_uri_str; @@ -179,6 +183,11 @@ class FakeResolverResponseGeneratorWrapper { grpc_uri_destroy(lb_uri); gpr_free(lb_uri_str); } + if (service_config_json != nullptr) { + result.service_config = grpc_core::ServiceConfig::Create( + service_config_json, &result.service_config_error); + GPR_ASSERT(result.service_config != nullptr); + } return result; } @@ -1465,6 +1474,34 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { EnableDefaultHealthCheckService(false); } +TEST_F(ClientLbEnd2endTest, + RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) { + EnableDefaultHealthCheckService(true); + // Start server. + const int kNumServers = 1; + StartServers(kNumServers); + // Create a channel with health-checking enabled. + const char* kServiceConfigJson = + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name\"}}"; + auto response_generator = BuildResolverResponseGenerator(); + auto channel = BuildChannel("round_robin", response_generator); + auto stub = BuildStub(channel); + std::vector ports = GetServersPorts(); + response_generator.SetNextResolution(ports, kServiceConfigJson); + servers_[0]->SetServingStatus("health_check_service_name", true); + EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */)); + // Send an update on the channel to change it to use a health checking + // service name that is not being reported as healthy. + const char* kServiceConfigJson2 = + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name2\"}}"; + response_generator.SetNextResolution(ports, kServiceConfigJson2); + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + // Clean up. + EnableDefaultHealthCheckService(false); +} + TEST_F(ClientLbEnd2endTest, ChannelIdleness) { // Start server. const int kNumServers = 1;