Apply health check service name changes to existing subchannels.

pull/20039/head
Mark D. Roth 5 years ago
parent 76ac4595e3
commit c8430023a5
  1. 90
      src/core/ext/filters/client_channel/client_channel.cc
  2. 43
      test/cpp/end2end/client_lb_end2end_test.cc

@ -273,6 +273,12 @@ class ChannelData {
bool received_first_resolver_result_ = false;
// The number of SubchannelWrapper instances referencing a given Subchannel.
Map<Subchannel*, int> subchannel_refcount_map_;
// The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane
// combiner when the SubchannelWrappers are created and destroyed.
// TODO(roth): We really want to use a set here, not a map. Since we don't
// currently have a set implementation, we use a map and ignore the value.
Map<SubchannelWrapper*, bool> subchannel_wrappers_;
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
// Updates are queued here in the control plane combiner and then applied
// in the data plane combiner when the picker is updated.
@ -799,14 +805,14 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
intptr_t subchannel_uuid = subchannel_node->uuid();
auto it = chand_->subchannel_refcount_map_.find(subchannel_);
if (it == chand_->subchannel_refcount_map_.end()) {
chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
}
++it->second;
}
chand_->subchannel_wrappers_[this] = true;
}
~SubchannelWrapper() {
@ -815,14 +821,14 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
"chand=%p: destroying subchannel wrapper %p for subchannel %p",
chand_, this, subchannel_);
}
chand_->subchannel_wrappers_.erase(this);
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
intptr_t subchannel_uuid = subchannel_node->uuid();
auto it = chand_->subchannel_refcount_map_.find(subchannel_);
GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
chand_->subchannel_refcount_map_.erase(it);
}
}
@ -844,8 +850,9 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
UniquePtr<ConnectivityStateWatcherInterface> watcher) override {
auto& watcher_wrapper = watcher_map_[watcher.get()];
GPR_ASSERT(watcher_wrapper == nullptr);
watcher_wrapper = New<WatcherWrapper>(
std::move(watcher), Ref(DEBUG_LOCATION, "WatcherWrapper"));
watcher_wrapper = New<WatcherWrapper>(std::move(watcher),
Ref(DEBUG_LOCATION, "WatcherWrapper"),
initial_state);
subchannel_->WatchConnectivityState(
initial_state,
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
@ -870,6 +877,40 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
return subchannel_->channel_args();
}
void UpdateHealthCheckServiceName(UniquePtr<char> health_check_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: subchannel wrapper %p: updating health check service "
"name from \"%s\" to \"%s\"",
chand_, this, health_check_service_name_.get(),
health_check_service_name.get());
}
for (auto& p : watcher_map_) {
WatcherWrapper*& watcher_wrapper = p.second;
// Cancel the current watcher and create a new one using the new
// health check service name.
// TODO(roth): If there is not already an existing health watch
// call for the new name, then the watcher will initially report
// state CONNECTING. If the LB policy is currently reporting
// state READY, this may cause it to switch to CONNECTING before
// switching back to READY. This could cause a small delay for
// RPCs being started on the channel. If/when this becomes a
// problem, we may be able to handle it by waiting for the new
// watcher to report READY before we use it to replace the old one.
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
subchannel_->CancelConnectivityStateWatch(
health_check_service_name_.get(), watcher_wrapper);
watcher_wrapper = replacement;
subchannel_->WatchConnectivityState(
replacement->last_seen_state(),
UniquePtr<char>(gpr_strdup(health_check_service_name.get())),
OrphanablePtr<Subchannel::ConnectivityStateWatcherInterface>(
replacement));
}
// Save the new health check service name.
health_check_service_name_ = std::move(health_check_service_name);
}
// Caller must be holding the control-plane combiner.
ConnectedSubchannel* connected_subchannel() const {
return connected_subchannel_.get();
@ -904,8 +945,11 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
WatcherWrapper(
UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher,
RefCountedPtr<SubchannelWrapper> parent)
: watcher_(std::move(watcher)), parent_(std::move(parent)) {}
RefCountedPtr<SubchannelWrapper> parent,
grpc_connectivity_state initial_state)
: watcher_(std::move(watcher)),
parent_(std::move(parent)),
last_seen_state_(initial_state) {}
~WatcherWrapper() { parent_.reset(DEBUG_LOCATION, "WatcherWrapper"); }
@ -928,9 +972,21 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
}
grpc_pollset_set* interested_parties() override {
return watcher_->interested_parties();
SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
watcher_.get();
if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
return watcher->interested_parties();
}
WatcherWrapper* MakeReplacement() {
auto* replacement =
New<WatcherWrapper>(std::move(watcher_), parent_, last_seen_state_);
replacement_ = replacement;
return replacement;
}
grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
private:
class Updater {
public:
@ -954,12 +1010,17 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
gpr_log(GPR_INFO,
"chand=%p: processing connectivity change in combiner "
"for subchannel wrapper %p subchannel %p "
"(connected_subchannel=%p state=%s)",
"(connected_subchannel=%p state=%s): watcher=%p",
self->parent_->parent_->chand_, self->parent_->parent_.get(),
self->parent_->parent_->subchannel_,
self->connected_subchannel_.get(),
grpc_connectivity_state_name(self->state_));
grpc_connectivity_state_name(self->state_),
self->parent_->watcher_.get());
}
// Ignore update if the parent WatcherWrapper has been replaced
// since this callback was scheduled.
if (self->parent_->watcher_ == nullptr) return;
self->parent_->last_seen_state_ = self->state_;
self->parent_->parent_->MaybeUpdateConnectedSubchannel(
std::move(self->connected_subchannel_));
self->parent_->watcher_->OnConnectivityStateChange(self->state_);
@ -974,6 +1035,8 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
UniquePtr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher_;
RefCountedPtr<SubchannelWrapper> parent_;
grpc_connectivity_state last_seen_state_;
WatcherWrapper* replacement_ = nullptr;
};
void MaybeUpdateConnectedSubchannel(
@ -1655,6 +1718,11 @@ bool ChannelData::ProcessResolverResultLocked(
} else {
chand->health_check_service_name_.reset();
}
// Update health check service name used by existing subchannel wrappers.
for (const auto& p : chand->subchannel_wrappers_) {
p.first->UpdateHealthCheckServiceName(
UniquePtr<char>(gpr_strdup(chand->health_check_service_name_.get())));
}
// Save service config.
chand->saved_service_config_ = std::move(service_config);
}

@ -42,6 +42,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -144,9 +145,11 @@ class FakeResolverResponseGeneratorWrapper {
response_generator_ = std::move(other.response_generator_);
}
void SetNextResolution(const std::vector<int>& ports) {
void SetNextResolution(const std::vector<int>& ports,
const char* service_config_json = nullptr) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponse(BuildFakeResults(ports));
response_generator_->SetResponse(
BuildFakeResults(ports, service_config_json));
}
void SetNextResolutionUponError(const std::vector<int>& ports) {
@ -165,7 +168,8 @@ class FakeResolverResponseGeneratorWrapper {
private:
static grpc_core::Resolver::Result BuildFakeResults(
const std::vector<int>& ports) {
const std::vector<int>& ports,
const char* service_config_json = nullptr) {
grpc_core::Resolver::Result result;
for (const int& port : ports) {
char* lb_uri_str;
@ -179,6 +183,11 @@ class FakeResolverResponseGeneratorWrapper {
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
if (service_config_json != nullptr) {
result.service_config = grpc_core::ServiceConfig::Create(
service_config_json, &result.service_config_error);
GPR_ASSERT(result.service_config != nullptr);
}
return result;
}
@ -1465,6 +1474,34 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
EnableDefaultHealthCheckService(false);
}
TEST_F(ClientLbEnd2endTest,
RoundRobinWithHealthCheckingServiceNameChangesAfterSubchannelsCreated) {
EnableDefaultHealthCheckService(true);
// Start server.
const int kNumServers = 1;
StartServers(kNumServers);
// Create a channel with health-checking enabled.
const char* kServiceConfigJson =
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name\"}}";
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("round_robin", response_generator);
auto stub = BuildStub(channel);
std::vector<int> ports = GetServersPorts();
response_generator.SetNextResolution(ports, kServiceConfigJson);
servers_[0]->SetServingStatus("health_check_service_name", true);
EXPECT_TRUE(WaitForChannelReady(channel.get(), 1 /* timeout_seconds */));
// Send an update on the channel to change it to use a health checking
// service name that is not being reported as healthy.
const char* kServiceConfigJson2 =
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name2\"}}";
response_generator.SetNextResolution(ports, kServiceConfigJson2);
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
// Clean up.
EnableDefaultHealthCheckService(false);
}
TEST_F(ClientLbEnd2endTest, ChannelIdleness) {
// Start server.
const int kNumServers = 1;

Loading…
Cancel
Save