|
|
@ -26,7 +26,6 @@ |
|
|
|
#include <stdio.h> |
|
|
|
#include <stdio.h> |
|
|
|
#include <string.h> |
|
|
|
#include <string.h> |
|
|
|
|
|
|
|
|
|
|
|
#include <map> |
|
|
|
|
|
|
|
#include <set> |
|
|
|
#include <set> |
|
|
|
|
|
|
|
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
#include <grpc/support/alloc.h> |
|
|
@ -57,6 +56,7 @@ |
|
|
|
#include "src/core/lib/channel/status_util.h" |
|
|
|
#include "src/core/lib/channel/status_util.h" |
|
|
|
#include "src/core/lib/gpr/string.h" |
|
|
|
#include "src/core/lib/gpr/string.h" |
|
|
|
#include "src/core/lib/gprpp/manual_constructor.h" |
|
|
|
#include "src/core/lib/gprpp/manual_constructor.h" |
|
|
|
|
|
|
|
#include "src/core/lib/gprpp/map.h" |
|
|
|
#include "src/core/lib/gprpp/sync.h" |
|
|
|
#include "src/core/lib/gprpp/sync.h" |
|
|
|
#include "src/core/lib/iomgr/iomgr.h" |
|
|
|
#include "src/core/lib/iomgr/iomgr.h" |
|
|
|
#include "src/core/lib/iomgr/polling_entity.h" |
|
|
|
#include "src/core/lib/iomgr/polling_entity.h" |
|
|
@ -295,7 +295,7 @@ class ChannelData { |
|
|
|
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; |
|
|
|
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; |
|
|
|
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_; |
|
|
|
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_; |
|
|
|
ConnectivityStateTracker state_tracker_; |
|
|
|
ConnectivityStateTracker state_tracker_; |
|
|
|
std::string health_check_service_name_; |
|
|
|
grpc_core::UniquePtr<char> health_check_service_name_; |
|
|
|
RefCountedPtr<ServiceConfig> saved_service_config_; |
|
|
|
RefCountedPtr<ServiceConfig> saved_service_config_; |
|
|
|
bool received_first_resolver_result_ = false; |
|
|
|
bool received_first_resolver_result_ = false; |
|
|
|
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
|
|
|
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
|
|
@ -850,7 +850,7 @@ class CallData { |
|
|
|
class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
public: |
|
|
|
public: |
|
|
|
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, |
|
|
|
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, |
|
|
|
std::string health_check_service_name) |
|
|
|
grpc_core::UniquePtr<char> health_check_service_name) |
|
|
|
: SubchannelInterface(&grpc_client_channel_routing_trace), |
|
|
|
: SubchannelInterface(&grpc_client_channel_routing_trace), |
|
|
|
chand_(chand), |
|
|
|
chand_(chand), |
|
|
|
subchannel_(subchannel), |
|
|
|
subchannel_(subchannel), |
|
|
@ -897,7 +897,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
grpc_connectivity_state CheckConnectivityState() override { |
|
|
|
grpc_connectivity_state CheckConnectivityState() override { |
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel; |
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel; |
|
|
|
grpc_connectivity_state connectivity_state = |
|
|
|
grpc_connectivity_state connectivity_state = |
|
|
|
subchannel_->CheckConnectivityState(health_check_service_name_, |
|
|
|
subchannel_->CheckConnectivityState(health_check_service_name_.get(), |
|
|
|
&connected_subchannel); |
|
|
|
&connected_subchannel); |
|
|
|
MaybeUpdateConnectedSubchannel(std::move(connected_subchannel)); |
|
|
|
MaybeUpdateConnectedSubchannel(std::move(connected_subchannel)); |
|
|
|
return connectivity_state; |
|
|
|
return connectivity_state; |
|
|
@ -912,7 +912,9 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
Ref(DEBUG_LOCATION, "WatcherWrapper"), |
|
|
|
Ref(DEBUG_LOCATION, "WatcherWrapper"), |
|
|
|
initial_state); |
|
|
|
initial_state); |
|
|
|
subchannel_->WatchConnectivityState( |
|
|
|
subchannel_->WatchConnectivityState( |
|
|
|
initial_state, health_check_service_name_, |
|
|
|
initial_state, |
|
|
|
|
|
|
|
grpc_core::UniquePtr<char>( |
|
|
|
|
|
|
|
gpr_strdup(health_check_service_name_.get())), |
|
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>( |
|
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>( |
|
|
|
watcher_wrapper)); |
|
|
|
watcher_wrapper)); |
|
|
|
} |
|
|
|
} |
|
|
@ -921,7 +923,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
ConnectivityStateWatcherInterface* watcher) override { |
|
|
|
ConnectivityStateWatcherInterface* watcher) override { |
|
|
|
auto it = watcher_map_.find(watcher); |
|
|
|
auto it = watcher_map_.find(watcher); |
|
|
|
GPR_ASSERT(it != watcher_map_.end()); |
|
|
|
GPR_ASSERT(it != watcher_map_.end()); |
|
|
|
subchannel_->CancelConnectivityStateWatch(health_check_service_name_, |
|
|
|
subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(), |
|
|
|
it->second); |
|
|
|
it->second); |
|
|
|
watcher_map_.erase(it); |
|
|
|
watcher_map_.erase(it); |
|
|
|
} |
|
|
|
} |
|
|
@ -934,13 +936,14 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
return subchannel_->channel_args(); |
|
|
|
return subchannel_->channel_args(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void UpdateHealthCheckServiceName(std::string health_check_service_name) { |
|
|
|
void UpdateHealthCheckServiceName( |
|
|
|
|
|
|
|
grpc_core::UniquePtr<char> health_check_service_name) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
"chand=%p: subchannel wrapper %p: updating health check service " |
|
|
|
"chand=%p: subchannel wrapper %p: updating health check service " |
|
|
|
"name from \"%s\" to \"%s\"", |
|
|
|
"name from \"%s\" to \"%s\"", |
|
|
|
chand_, this, health_check_service_name_.c_str(), |
|
|
|
chand_, this, health_check_service_name_.get(), |
|
|
|
health_check_service_name.c_str()); |
|
|
|
health_check_service_name.get()); |
|
|
|
} |
|
|
|
} |
|
|
|
for (auto& p : watcher_map_) { |
|
|
|
for (auto& p : watcher_map_) { |
|
|
|
WatcherWrapper*& watcher_wrapper = p.second; |
|
|
|
WatcherWrapper*& watcher_wrapper = p.second; |
|
|
@ -955,11 +958,13 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
// problem, we may be able to handle it by waiting for the new
|
|
|
|
// problem, we may be able to handle it by waiting for the new
|
|
|
|
// watcher to report READY before we use it to replace the old one.
|
|
|
|
// watcher to report READY before we use it to replace the old one.
|
|
|
|
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement(); |
|
|
|
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement(); |
|
|
|
subchannel_->CancelConnectivityStateWatch(health_check_service_name_, |
|
|
|
subchannel_->CancelConnectivityStateWatch( |
|
|
|
watcher_wrapper); |
|
|
|
health_check_service_name_.get(), watcher_wrapper); |
|
|
|
watcher_wrapper = replacement; |
|
|
|
watcher_wrapper = replacement; |
|
|
|
subchannel_->WatchConnectivityState( |
|
|
|
subchannel_->WatchConnectivityState( |
|
|
|
replacement->last_seen_state(), health_check_service_name, |
|
|
|
replacement->last_seen_state(), |
|
|
|
|
|
|
|
grpc_core::UniquePtr<char>( |
|
|
|
|
|
|
|
gpr_strdup(health_check_service_name.get())), |
|
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>( |
|
|
|
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>( |
|
|
|
replacement)); |
|
|
|
replacement)); |
|
|
|
} |
|
|
|
} |
|
|
@ -1097,7 +1102,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface { |
|
|
|
|
|
|
|
|
|
|
|
ChannelData* chand_; |
|
|
|
ChannelData* chand_; |
|
|
|
Subchannel* subchannel_; |
|
|
|
Subchannel* subchannel_; |
|
|
|
std::string health_check_service_name_; |
|
|
|
grpc_core::UniquePtr<char> health_check_service_name_; |
|
|
|
// Maps from the address of the watcher passed to us by the LB policy
|
|
|
|
// Maps from the address of the watcher passed to us by the LB policy
|
|
|
|
// to the address of the WrapperWatcher that we passed to the underlying
|
|
|
|
// to the address of the WrapperWatcher that we passed to the underlying
|
|
|
|
// subchannel. This is needed so that when the LB policy calls
|
|
|
|
// subchannel. This is needed so that when the LB policy calls
|
|
|
@ -1260,9 +1265,10 @@ class ChannelData::ClientChannelControlHelper |
|
|
|
const grpc_channel_args& args) override { |
|
|
|
const grpc_channel_args& args) override { |
|
|
|
bool inhibit_health_checking = grpc_channel_arg_get_bool( |
|
|
|
bool inhibit_health_checking = grpc_channel_arg_get_bool( |
|
|
|
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); |
|
|
|
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); |
|
|
|
std::string health_check_service_name; |
|
|
|
grpc_core::UniquePtr<char> health_check_service_name; |
|
|
|
if (!inhibit_health_checking) { |
|
|
|
if (!inhibit_health_checking) { |
|
|
|
health_check_service_name = chand_->health_check_service_name_; |
|
|
|
health_check_service_name.reset( |
|
|
|
|
|
|
|
gpr_strdup(chand_->health_check_service_name_.get())); |
|
|
|
} |
|
|
|
} |
|
|
|
static const char* args_to_remove[] = { |
|
|
|
static const char* args_to_remove[] = { |
|
|
|
GRPC_ARG_INHIBIT_HEALTH_CHECKING, |
|
|
|
GRPC_ARG_INHIBIT_HEALTH_CHECKING, |
|
|
@ -1457,7 +1463,7 @@ void ChannelData::UpdateStateAndPickerLocked( |
|
|
|
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) { |
|
|
|
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) { |
|
|
|
// Clean the control plane when entering IDLE.
|
|
|
|
// Clean the control plane when entering IDLE.
|
|
|
|
if (picker_ == nullptr) { |
|
|
|
if (picker_ == nullptr) { |
|
|
|
health_check_service_name_.clear(); |
|
|
|
health_check_service_name_.reset(); |
|
|
|
saved_service_config_.reset(); |
|
|
|
saved_service_config_.reset(); |
|
|
|
received_first_resolver_result_ = false; |
|
|
|
received_first_resolver_result_ = false; |
|
|
|
} |
|
|
|
} |
|
|
@ -1700,15 +1706,16 @@ bool ChannelData::ProcessResolverResultLocked( |
|
|
|
} |
|
|
|
} |
|
|
|
// Save health check service name.
|
|
|
|
// Save health check service name.
|
|
|
|
if (service_config != nullptr) { |
|
|
|
if (service_config != nullptr) { |
|
|
|
chand->health_check_service_name_ = |
|
|
|
chand->health_check_service_name_.reset( |
|
|
|
parsed_service_config->health_check_service_name(); |
|
|
|
gpr_strdup(parsed_service_config->health_check_service_name())); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
chand->health_check_service_name_.clear(); |
|
|
|
chand->health_check_service_name_.reset(); |
|
|
|
} |
|
|
|
} |
|
|
|
// Update health check service name used by existing subchannel wrappers.
|
|
|
|
// Update health check service name used by existing subchannel wrappers.
|
|
|
|
for (auto* subchannel_wrapper : chand->subchannel_wrappers_) { |
|
|
|
for (auto* subchannel_wrapper : chand->subchannel_wrappers_) { |
|
|
|
subchannel_wrapper->UpdateHealthCheckServiceName( |
|
|
|
subchannel_wrapper->UpdateHealthCheckServiceName( |
|
|
|
chand->health_check_service_name_); |
|
|
|
grpc_core::UniquePtr<char>( |
|
|
|
|
|
|
|
gpr_strdup(chand->health_check_service_name_.get()))); |
|
|
|
} |
|
|
|
} |
|
|
|
// Save service config.
|
|
|
|
// Save service config.
|
|
|
|
chand->saved_service_config_ = std::move(service_config); |
|
|
|
chand->saved_service_config_ = std::move(service_config); |
|
|
|