Merge pull request #25040 from markdroth/c++_health_check_service_name

Use std::string for health check service name.
pull/25091/head
Mark D. Roth 4 years ago committed by GitHub
commit f159099778
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      src/core/ext/filters/client_channel/client_channel.cc
  2. 9
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 4
      src/core/ext/filters/client_channel/health/health_check_client.h
  4. 24
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  5. 16
      src/core/ext/filters/client_channel/resolver_result_parsing.h
  6. 46
      src/core/ext/filters/client_channel/subchannel.cc
  7. 17
      src/core/ext/filters/client_channel/subchannel.h
  8. 4
      test/core/client_channel/service_config_test.cc

@ -364,7 +364,7 @@ class ChannelData {
bool previous_resolution_contained_addresses_ = false; bool previous_resolution_contained_addresses_ = false;
RefCountedPtr<ServiceConfig> saved_service_config_; RefCountedPtr<ServiceConfig> saved_service_config_;
RefCountedPtr<ConfigSelector> saved_config_selector_; RefCountedPtr<ConfigSelector> saved_config_selector_;
UniquePtr<char> health_check_service_name_; absl::optional<std::string> health_check_service_name_;
OrphanablePtr<LoadBalancingPolicy> lb_policy_; OrphanablePtr<LoadBalancingPolicy> lb_policy_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
// The number of SubchannelWrapper instances referencing a given Subchannel. // The number of SubchannelWrapper instances referencing a given Subchannel.
@ -1260,7 +1260,7 @@ grpc_error* DynamicTerminationFilterChannelData::Init(
class ChannelData::SubchannelWrapper : public SubchannelInterface { class ChannelData::SubchannelWrapper : public SubchannelInterface {
public: public:
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
UniquePtr<char> health_check_service_name) absl::optional<std::string> health_check_service_name)
: SubchannelInterface( : SubchannelInterface(
GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
? "SubchannelWrapper" ? "SubchannelWrapper"
@ -1310,7 +1310,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
grpc_connectivity_state CheckConnectivityState() override { grpc_connectivity_state CheckConnectivityState() override {
RefCountedPtr<ConnectedSubchannel> connected_subchannel; RefCountedPtr<ConnectedSubchannel> connected_subchannel;
grpc_connectivity_state connectivity_state = grpc_connectivity_state connectivity_state =
subchannel_->CheckConnectivityState(health_check_service_name_.get(), subchannel_->CheckConnectivityState(health_check_service_name_,
&connected_subchannel); &connected_subchannel);
MaybeUpdateConnectedSubchannel(std::move(connected_subchannel)); MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
return connectivity_state; return connectivity_state;
@ -1325,8 +1325,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
Ref(DEBUG_LOCATION, "WatcherWrapper"), Ref(DEBUG_LOCATION, "WatcherWrapper"),
initial_state); initial_state);
subchannel_->WatchConnectivityState( subchannel_->WatchConnectivityState(
initial_state, initial_state, health_check_service_name_,
UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>( RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
watcher_wrapper)); watcher_wrapper));
} }
@ -1335,7 +1334,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
ConnectivityStateWatcherInterface* watcher) override { ConnectivityStateWatcherInterface* watcher) override {
auto it = watcher_map_.find(watcher); auto it = watcher_map_.find(watcher);
GPR_ASSERT(it != watcher_map_.end()); GPR_ASSERT(it != watcher_map_.end());
subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
it->second); it->second);
watcher_map_.erase(it); watcher_map_.erase(it);
} }
@ -1352,13 +1351,14 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
subchannel_->ThrottleKeepaliveTime(new_keepalive_time); subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
} }
void UpdateHealthCheckServiceName(UniquePtr<char> health_check_service_name) { void UpdateHealthCheckServiceName(
absl::optional<std::string> health_check_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: subchannel wrapper %p: updating health check service " "chand=%p: subchannel wrapper %p: updating health check service "
"name from \"%s\" to \"%s\"", "name from \"%s\" to \"%s\"",
chand_, this, health_check_service_name_.get(), chand_, this, health_check_service_name_->c_str(),
health_check_service_name.get()); health_check_service_name->c_str());
} }
for (auto& p : watcher_map_) { for (auto& p : watcher_map_) {
WatcherWrapper*& watcher_wrapper = p.second; WatcherWrapper*& watcher_wrapper = p.second;
@ -1373,12 +1373,11 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
// problem, we may be able to handle it by waiting for the new // problem, we may be able to handle it by waiting for the new
// watcher to report READY before we use it to replace the old one. // watcher to report READY before we use it to replace the old one.
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement(); WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
subchannel_->CancelConnectivityStateWatch( subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
health_check_service_name_.get(), watcher_wrapper); watcher_wrapper);
watcher_wrapper = replacement; watcher_wrapper = replacement;
subchannel_->WatchConnectivityState( subchannel_->WatchConnectivityState(
replacement->last_seen_state(), replacement->last_seen_state(), health_check_service_name,
UniquePtr<char>(gpr_strdup(health_check_service_name.get())),
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>( RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
replacement)); replacement));
} }
@ -1542,7 +1541,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
ChannelData* chand_; ChannelData* chand_;
Subchannel* subchannel_; Subchannel* subchannel_;
UniquePtr<char> health_check_service_name_; absl::optional<std::string> health_check_service_name_;
// Maps from the address of the watcher passed to us by the LB policy // Maps from the address of the watcher passed to us by the LB policy
// to the address of the WrapperWatcher that we passed to the underlying // to the address of the WrapperWatcher that we passed to the underlying
// subchannel. This is needed so that when the LB policy calls // subchannel. This is needed so that when the LB policy calls
@ -1735,10 +1734,9 @@ class ChannelData::ClientChannelControlHelper
// Determine health check service name. // Determine health check service name.
bool inhibit_health_checking = grpc_channel_arg_get_bool( bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
UniquePtr<char> health_check_service_name; absl::optional<std::string> health_check_service_name;
if (!inhibit_health_checking) { if (!inhibit_health_checking) {
health_check_service_name.reset( health_check_service_name = chand_->health_check_service_name_;
gpr_strdup(chand_->health_check_service_name_.get()));
} }
// Remove channel args that should not affect subchannel uniqueness. // Remove channel args that should not affect subchannel uniqueness.
static const char* args_to_remove[] = { static const char* args_to_remove[] = {
@ -2252,17 +2250,14 @@ void ChannelData::UpdateServiceConfigInControlPlaneLocked(
// Save service config. // Save service config.
saved_service_config_ = std::move(service_config); saved_service_config_ = std::move(service_config);
// Update health check service name if needed. // Update health check service name if needed.
if (((health_check_service_name_ == nullptr) != if (health_check_service_name_ !=
(parsed_service_config->health_check_service_name() == nullptr)) || parsed_service_config->health_check_service_name()) {
(health_check_service_name_ != nullptr && health_check_service_name_ =
strcmp(health_check_service_name_.get(), parsed_service_config->health_check_service_name();
parsed_service_config->health_check_service_name()) != 0)) {
health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
// Update health check service name used by existing subchannel wrappers. // Update health check service name used by existing subchannel wrappers.
for (auto* subchannel_wrapper : subchannel_wrappers_) { for (auto* subchannel_wrapper : subchannel_wrappers_) {
subchannel_wrapper->UpdateHealthCheckServiceName( subchannel_wrapper->UpdateHealthCheckServiceName(
UniquePtr<char>(gpr_strdup(health_check_service_name_.get()))); health_check_service_name_);
} }
} }
// Swap out the data used by GetChannelInfo(). // Swap out the data used by GetChannelInfo().
@ -2399,7 +2394,6 @@ void ChannelData::UpdateStateAndPickerLocked(
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) { std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
// Clean the control plane when entering IDLE. // Clean the control plane when entering IDLE.
if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) { if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
health_check_service_name_.reset();
saved_service_config_.reset(); saved_service_config_.reset();
saved_config_selector_.reset(); saved_config_selector_.reset();
} }

@ -46,7 +46,7 @@ TraceFlag grpc_health_check_client_trace(false, "health_check_client");
// //
HealthCheckClient::HealthCheckClient( HealthCheckClient::HealthCheckClient(
const char* service_name, std::string service_name,
RefCountedPtr<ConnectedSubchannel> connected_subchannel, RefCountedPtr<ConnectedSubchannel> connected_subchannel,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
RefCountedPtr<channelz::SubchannelNode> channelz_node, RefCountedPtr<channelz::SubchannelNode> channelz_node,
@ -55,7 +55,7 @@ HealthCheckClient::HealthCheckClient(
GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace) GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
? "HealthCheckClient" ? "HealthCheckClient"
: nullptr), : nullptr),
service_name_(service_name), service_name_(std::move(service_name)),
connected_subchannel_(std::move(connected_subchannel)), connected_subchannel_(std::move(connected_subchannel)),
interested_parties_(interested_parties), interested_parties_(interested_parties),
channelz_node_(std::move(channelz_node)), channelz_node_(std::move(channelz_node)),
@ -180,13 +180,14 @@ void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
namespace { namespace {
void EncodeRequest(const char* service_name, void EncodeRequest(const std::string& service_name,
ManualConstructor<SliceBufferByteStream>* send_message) { ManualConstructor<SliceBufferByteStream>* send_message) {
upb::Arena arena; upb::Arena arena;
grpc_health_v1_HealthCheckRequest* request_struct = grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_new(arena.ptr()); grpc_health_v1_HealthCheckRequest_new(arena.ptr());
grpc_health_v1_HealthCheckRequest_set_service( grpc_health_v1_HealthCheckRequest_set_service(
request_struct, upb_strview_makez(service_name)); request_struct,
upb_strview_make(service_name.data(), service_name.size()));
size_t buf_length; size_t buf_length;
char* buf = grpc_health_v1_HealthCheckRequest_serialize( char* buf = grpc_health_v1_HealthCheckRequest_serialize(
request_struct, arena.ptr(), &buf_length); request_struct, arena.ptr(), &buf_length);

@ -44,7 +44,7 @@ namespace grpc_core {
class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> { class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
public: public:
HealthCheckClient(const char* service_name, HealthCheckClient(std::string service_name,
RefCountedPtr<ConnectedSubchannel> connected_subchannel, RefCountedPtr<ConnectedSubchannel> connected_subchannel,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
RefCountedPtr<channelz::SubchannelNode> channelz_node, RefCountedPtr<channelz::SubchannelNode> channelz_node,
@ -150,7 +150,7 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
void SetHealthStatusLocked(grpc_connectivity_state state, void SetHealthStatusLocked(grpc_connectivity_state state,
const char* reason); // Requires holding mu_. const char* reason); // Requires holding mu_.
const char* service_name_; // Do not own. std::string service_name_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_; RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_pollset_set* interested_parties_; // Do not own. grpc_pollset_set* interested_parties_; // Do not own.
RefCountedPtr<channelz::SubchannelNode> channelz_node_; RefCountedPtr<channelz::SubchannelNode> channelz_node_;

@ -248,27 +248,25 @@ grpc_error* ParseRetryThrottling(
return GRPC_ERROR_CREATE_FROM_VECTOR("retryPolicy", &error_list); return GRPC_ERROR_CREATE_FROM_VECTOR("retryPolicy", &error_list);
} }
const char* ParseHealthCheckConfig(const Json& field, grpc_error** error) { absl::optional<std::string> ParseHealthCheckConfig(const Json& field,
grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
const char* service_name = nullptr;
if (field.type() != Json::Type::OBJECT) { if (field.type() != Json::Type::OBJECT) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:healthCheckConfig error:should be of type object"); "field:healthCheckConfig error:should be of type object");
return nullptr; return absl::nullopt;
} }
std::vector<grpc_error*> error_list; std::vector<grpc_error*> error_list;
absl::optional<std::string> service_name;
auto it = field.object_value().find("serviceName"); auto it = field.object_value().find("serviceName");
if (it != field.object_value().end()) { if (it != field.object_value().end()) {
if (it->second.type() != Json::Type::STRING) { if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:serviceName error:should be of type string")); "field:serviceName error:should be of type string"));
} else { } else {
service_name = it->second.string_value().c_str(); service_name = it->second.string_value();
} }
} }
if (!error_list.empty()) {
return nullptr;
}
*error = *error =
GRPC_ERROR_CREATE_FROM_VECTOR("field:healthCheckConfig", &error_list); GRPC_ERROR_CREATE_FROM_VECTOR("field:healthCheckConfig", &error_list);
return service_name; return service_name;
@ -281,12 +279,8 @@ ClientChannelServiceConfigParser::ParseGlobalParams(
const grpc_channel_args* /*args*/, const Json& json, grpc_error** error) { const grpc_channel_args* /*args*/, const Json& json, grpc_error** error) {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
std::vector<grpc_error*> error_list; std::vector<grpc_error*> error_list;
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config;
std::string lb_policy_name;
absl::optional<ClientChannelGlobalParsedConfig::RetryThrottling>
retry_throttling;
const char* health_check_service_name = nullptr;
// Parse LB config. // Parse LB config.
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config;
auto it = json.object_value().find("loadBalancingConfig"); auto it = json.object_value().find("loadBalancingConfig");
if (it != json.object_value().end()) { if (it != json.object_value().end()) {
grpc_error* parse_error = GRPC_ERROR_NONE; grpc_error* parse_error = GRPC_ERROR_NONE;
@ -300,6 +294,7 @@ ClientChannelServiceConfigParser::ParseGlobalParams(
} }
} }
// Parse deprecated LB policy. // Parse deprecated LB policy.
std::string lb_policy_name;
it = json.object_value().find("loadBalancingPolicy"); it = json.object_value().find("loadBalancingPolicy");
if (it != json.object_value().end()) { if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) { if (it->second.type() != Json::Type::STRING) {
@ -325,6 +320,8 @@ ClientChannelServiceConfigParser::ParseGlobalParams(
} }
} }
// Parse retry throttling. // Parse retry throttling.
absl::optional<ClientChannelGlobalParsedConfig::RetryThrottling>
retry_throttling;
it = json.object_value().find("retryThrottling"); it = json.object_value().find("retryThrottling");
if (it != json.object_value().end()) { if (it != json.object_value().end()) {
ClientChannelGlobalParsedConfig::RetryThrottling data; ClientChannelGlobalParsedConfig::RetryThrottling data;
@ -336,6 +333,7 @@ ClientChannelServiceConfigParser::ParseGlobalParams(
} }
} }
// Parse health check config. // Parse health check config.
absl::optional<std::string> health_check_service_name;
it = json.object_value().find("healthCheckConfig"); it = json.object_value().find("healthCheckConfig");
if (it != json.object_value().end()) { if (it != json.object_value().end()) {
grpc_error* parsing_error = GRPC_ERROR_NONE; grpc_error* parsing_error = GRPC_ERROR_NONE;
@ -350,7 +348,7 @@ ClientChannelServiceConfigParser::ParseGlobalParams(
if (*error == GRPC_ERROR_NONE) { if (*error == GRPC_ERROR_NONE) {
return absl::make_unique<ClientChannelGlobalParsedConfig>( return absl::make_unique<ClientChannelGlobalParsedConfig>(
std::move(parsed_lb_config), std::move(lb_policy_name), std::move(parsed_lb_config), std::move(lb_policy_name),
retry_throttling, health_check_service_name); retry_throttling, std::move(health_check_service_name));
} }
return nullptr; return nullptr;
} }

@ -49,15 +49,11 @@ class ClientChannelGlobalParsedConfig
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config, RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config,
std::string parsed_deprecated_lb_policy, std::string parsed_deprecated_lb_policy,
const absl::optional<RetryThrottling>& retry_throttling, const absl::optional<RetryThrottling>& retry_throttling,
const char* health_check_service_name) absl::optional<std::string> health_check_service_name)
: parsed_lb_config_(std::move(parsed_lb_config)), : parsed_lb_config_(std::move(parsed_lb_config)),
parsed_deprecated_lb_policy_(std::move(parsed_deprecated_lb_policy)), parsed_deprecated_lb_policy_(std::move(parsed_deprecated_lb_policy)),
retry_throttling_(retry_throttling), retry_throttling_(retry_throttling),
health_check_service_name_(health_check_service_name) {} health_check_service_name_(std::move(health_check_service_name)) {}
absl::optional<RetryThrottling> retry_throttling() const {
return retry_throttling_;
}
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config() const { RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config() const {
return parsed_lb_config_; return parsed_lb_config_;
@ -67,7 +63,11 @@ class ClientChannelGlobalParsedConfig
return parsed_deprecated_lb_policy_; return parsed_deprecated_lb_policy_;
} }
const char* health_check_service_name() const { absl::optional<RetryThrottling> retry_throttling() const {
return retry_throttling_;
}
const absl::optional<std::string>& health_check_service_name() const {
return health_check_service_name_; return health_check_service_name_;
} }
@ -75,7 +75,7 @@ class ClientChannelGlobalParsedConfig
RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config_; RefCountedPtr<LoadBalancingPolicy::Config> parsed_lb_config_;
std::string parsed_deprecated_lb_policy_; std::string parsed_deprecated_lb_policy_;
absl::optional<RetryThrottling> retry_throttling_; absl::optional<RetryThrottling> retry_throttling_;
const char* health_check_service_name_; absl::optional<std::string> health_check_service_name_;
}; };
class ClientChannelMethodParsedConfig class ClientChannelMethodParsedConfig

@ -424,8 +424,7 @@ void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
class Subchannel::HealthWatcherMap::HealthWatcher class Subchannel::HealthWatcherMap::HealthWatcher
: public AsyncConnectivityStateWatcherInterface { : public AsyncConnectivityStateWatcherInterface {
public: public:
HealthWatcher(Subchannel* c, HealthWatcher(Subchannel* c, std::string health_check_service_name,
grpc_core::UniquePtr<char> health_check_service_name,
grpc_connectivity_state subchannel_state) grpc_connectivity_state subchannel_state)
: subchannel_(c), : subchannel_(c),
health_check_service_name_(std::move(health_check_service_name)), health_check_service_name_(std::move(health_check_service_name)),
@ -440,8 +439,8 @@ class Subchannel::HealthWatcherMap::HealthWatcher
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher");
} }
const char* health_check_service_name() const { const std::string& health_check_service_name() const {
return health_check_service_name_.get(); return health_check_service_name_;
} }
grpc_connectivity_state state() const { return state_; } grpc_connectivity_state state() const { return state_; }
@ -504,12 +503,12 @@ class Subchannel::HealthWatcherMap::HealthWatcher
void StartHealthCheckingLocked() { void StartHealthCheckingLocked() {
GPR_ASSERT(health_check_client_ == nullptr); GPR_ASSERT(health_check_client_ == nullptr);
health_check_client_ = MakeOrphanable<HealthCheckClient>( health_check_client_ = MakeOrphanable<HealthCheckClient>(
health_check_service_name_.get(), subchannel_->connected_subchannel_, health_check_service_name_, subchannel_->connected_subchannel_,
subchannel_->pollset_set_, subchannel_->channelz_node_, Ref()); subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
} }
Subchannel* subchannel_; Subchannel* subchannel_;
grpc_core::UniquePtr<char> health_check_service_name_; std::string health_check_service_name_;
OrphanablePtr<HealthCheckClient> health_check_client_; OrphanablePtr<HealthCheckClient> health_check_client_;
grpc_connectivity_state state_; grpc_connectivity_state state_;
absl::Status status_; absl::Status status_;
@ -522,18 +521,17 @@ class Subchannel::HealthWatcherMap::HealthWatcher
void Subchannel::HealthWatcherMap::AddWatcherLocked( void Subchannel::HealthWatcherMap::AddWatcherLocked(
Subchannel* subchannel, grpc_connectivity_state initial_state, Subchannel* subchannel, grpc_connectivity_state initial_state,
grpc_core::UniquePtr<char> health_check_service_name, const std::string& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
// If the health check service name is not already present in the map, // If the health check service name is not already present in the map,
// add it. // add it.
auto it = map_.find(health_check_service_name.get()); auto it = map_.find(health_check_service_name);
HealthWatcher* health_watcher; HealthWatcher* health_watcher;
if (it == map_.end()) { if (it == map_.end()) {
const char* key = health_check_service_name.get();
auto w = MakeOrphanable<HealthWatcher>( auto w = MakeOrphanable<HealthWatcher>(
subchannel, std::move(health_check_service_name), subchannel->state_); subchannel, health_check_service_name, subchannel->state_);
health_watcher = w.get(); health_watcher = w.get();
map_[key] = std::move(w); map_.emplace(health_check_service_name, std::move(w));
} else { } else {
health_watcher = it->second.get(); health_watcher = it->second.get();
} }
@ -542,7 +540,7 @@ void Subchannel::HealthWatcherMap::AddWatcherLocked(
} }
void Subchannel::HealthWatcherMap::RemoveWatcherLocked( void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
const char* health_check_service_name, const std::string& health_check_service_name,
ConnectivityStateWatcherInterface* watcher) { ConnectivityStateWatcherInterface* watcher) {
auto it = map_.find(health_check_service_name); auto it = map_.find(health_check_service_name);
GPR_ASSERT(it != map_.end()); GPR_ASSERT(it != map_.end());
@ -561,7 +559,7 @@ void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
grpc_connectivity_state grpc_connectivity_state
Subchannel::HealthWatcherMap::CheckConnectivityStateLocked( Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
Subchannel* subchannel, const char* health_check_service_name) { Subchannel* subchannel, const std::string& health_check_service_name) {
auto it = map_.find(health_check_service_name); auto it = map_.find(health_check_service_name);
if (it == map_.end()) { if (it == map_.end()) {
// If the health check service name is not found in the map, we're // If the health check service name is not found in the map, we're
@ -824,15 +822,15 @@ channelz::SubchannelNode* Subchannel::channelz_node() {
} }
grpc_connectivity_state Subchannel::CheckConnectivityState( grpc_connectivity_state Subchannel::CheckConnectivityState(
const char* health_check_service_name, const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectedSubchannel>* connected_subchannel) { RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
grpc_connectivity_state state; grpc_connectivity_state state;
if (health_check_service_name == nullptr) { if (!health_check_service_name.has_value()) {
state = state_; state = state_;
} else { } else {
state = health_watcher_map_.CheckConnectivityStateLocked( state = health_watcher_map_.CheckConnectivityStateLocked(
this, health_check_service_name); this, *health_check_service_name);
} }
if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) { if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
*connected_subchannel = connected_subchannel_; *connected_subchannel = connected_subchannel_;
@ -842,37 +840,37 @@ grpc_connectivity_state Subchannel::CheckConnectivityState(
void Subchannel::WatchConnectivityState( void Subchannel::WatchConnectivityState(
grpc_connectivity_state initial_state, grpc_connectivity_state initial_state,
grpc_core::UniquePtr<char> health_check_service_name, const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties(); grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) { if (interested_parties != nullptr) {
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
} }
if (health_check_service_name == nullptr) { if (!health_check_service_name.has_value()) {
if (state_ != initial_state) { if (state_ != initial_state) {
new AsyncWatcherNotifierLocked(watcher, this, state_, status_); new AsyncWatcherNotifierLocked(watcher, this, state_, status_);
} }
watcher_list_.AddWatcherLocked(std::move(watcher)); watcher_list_.AddWatcherLocked(std::move(watcher));
} else { } else {
health_watcher_map_.AddWatcherLocked(this, initial_state, health_watcher_map_.AddWatcherLocked(
std::move(health_check_service_name), this, initial_state, *health_check_service_name, std::move(watcher));
std::move(watcher));
} }
} }
void Subchannel::CancelConnectivityStateWatch( void Subchannel::CancelConnectivityStateWatch(
const char* health_check_service_name, const absl::optional<std::string>& health_check_service_name,
ConnectivityStateWatcherInterface* watcher) { ConnectivityStateWatcherInterface* watcher) {
MutexLock lock(&mu_); MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties(); grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) { if (interested_parties != nullptr) {
grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties); grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
} }
if (health_check_service_name == nullptr) { if (!health_check_service_name.has_value()) {
watcher_list_.RemoveWatcherLocked(watcher); watcher_list_.RemoveWatcherLocked(watcher);
} else { } else {
health_watcher_map_.RemoveWatcherLocked(health_check_service_name, watcher); health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
watcher);
} }
} }

@ -251,7 +251,7 @@ class Subchannel {
// service name. // service name.
// If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel. // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
grpc_connectivity_state CheckConnectivityState( grpc_connectivity_state CheckConnectivityState(
const char* health_check_service_name, const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectedSubchannel>* connected_subchannel); RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
// Starts watching the subchannel's connectivity state. // Starts watching the subchannel's connectivity state.
@ -264,13 +264,14 @@ class Subchannel {
// destroyed or when CancelConnectivityStateWatch() is called. // destroyed or when CancelConnectivityStateWatch() is called.
void WatchConnectivityState( void WatchConnectivityState(
grpc_connectivity_state initial_state, grpc_connectivity_state initial_state,
grpc_core::UniquePtr<char> health_check_service_name, const absl::optional<std::string>& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher); RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
// Cancels a connectivity state watch. // Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op. // If the watcher has already been destroyed, this is a no-op.
void CancelConnectivityStateWatch(const char* health_check_service_name, void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher); const absl::optional<std::string>& health_check_service_name,
ConnectivityStateWatcherInterface* watcher);
// Attempt to connect to the backend. Has no effect if already connected. // Attempt to connect to the backend. Has no effect if already connected.
void AttemptToConnect(); void AttemptToConnect();
@ -334,9 +335,9 @@ class Subchannel {
public: public:
void AddWatcherLocked( void AddWatcherLocked(
Subchannel* subchannel, grpc_connectivity_state initial_state, Subchannel* subchannel, grpc_connectivity_state initial_state,
grpc_core::UniquePtr<char> health_check_service_name, const std::string& health_check_service_name,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher); RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
void RemoveWatcherLocked(const char* health_check_service_name, void RemoveWatcherLocked(const std::string& health_check_service_name,
ConnectivityStateWatcherInterface* watcher); ConnectivityStateWatcherInterface* watcher);
// Notifies the watcher when the subchannel's state changes. // Notifies the watcher when the subchannel's state changes.
@ -344,14 +345,14 @@ class Subchannel {
const absl::Status& status); const absl::Status& status);
grpc_connectivity_state CheckConnectivityStateLocked( grpc_connectivity_state CheckConnectivityStateLocked(
Subchannel* subchannel, const char* health_check_service_name); Subchannel* subchannel, const std::string& health_check_service_name);
void ShutdownLocked(); void ShutdownLocked();
private: private:
class HealthWatcher; class HealthWatcher;
std::map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_; std::map<std::string, OrphanablePtr<HealthWatcher>> map_;
}; };
class ConnectedSubchannelStateWatcher; class ConnectedSubchannelStateWatcher;

@ -991,8 +991,8 @@ TEST_F(ClientChannelParserTest, ValidHealthCheck) {
static_cast<grpc_core::internal::ClientChannelGlobalParsedConfig*>( static_cast<grpc_core::internal::ClientChannelGlobalParsedConfig*>(
svc_cfg->GetGlobalParsedConfig(0)); svc_cfg->GetGlobalParsedConfig(0));
ASSERT_NE(parsed_config, nullptr); ASSERT_NE(parsed_config, nullptr);
EXPECT_STREQ(parsed_config->health_check_service_name(), EXPECT_EQ(parsed_config->health_check_service_name(),
"health_check_service_name"); "health_check_service_name");
} }
TEST_F(ClientChannelParserTest, InvalidHealthCheckMultipleEntries) { TEST_F(ClientChannelParserTest, InvalidHealthCheckMultipleEntries) {

Loading…
Cancel
Save