Support xds request timeouts.

pull/21775/head
Mark D. Roth 5 years ago
parent 25d6f5f18f
commit d7a91d59bb
  1. 5
      include/grpc/impl/codegen/grpc_types.h
  2. 2
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  3. 6
      src/core/ext/filters/client_channel/xds/xds_api.cc
  4. 15
      src/core/ext/filters/client_channel/xds/xds_api.h
  5. 446
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 18
      src/core/ext/filters/client_channel/xds/xds_client.h
  7. 5
      src/core/lib/gprpp/string_view.h
  8. 75
      test/cpp/end2end/xds_end2end_test.cc

@ -360,6 +360,11 @@ typedef struct {
of that priority fail to connect. If 0, failover happens immediately. Default
value is 10 seconds. */
#define GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS "grpc.xds_failover_timeout_ms"
/* Timeout in milliseconds to wait for a resource to be returned from
* the xds server before assuming that it does not exist.
* The default is 15 seconds. */
#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \
"grpc.xds_resource_does_not_exist_timeout_ms"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression"

@ -68,6 +68,7 @@ class XdsResolver : public Resolver {
void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
RefCountedPtr<ServiceConfig> service_config) {
if (resolver_->xds_client_ == nullptr) return;
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args =
@ -77,6 +78,7 @@ void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
}
void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
if (resolver_->xds_client_ == nullptr) return;
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args =

@ -1119,10 +1119,12 @@ void LocalityStatsPopulate(
} // namespace
grpc_slice XdsLrsRequestCreateAndEncode(
std::map<StringView, std::set<XdsClientStats*>> client_stats_map) {
std::map<StringView, std::set<XdsClientStats*>, StringLess>
client_stats_map) {
upb::Arena arena;
// Get the snapshots.
std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>>
std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>,
StringLess>
snapshot_map;
for (auto& p : client_stats_map) {
const StringView& cluster_name = p.first;

@ -41,19 +41,6 @@ constexpr char kCdsTypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
constexpr char kEdsTypeUrl[] =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
// The version state for each specific ADS resource type.
struct VersionState {
// The version of the latest response that is accepted and used.
std::string version_info;
// The nonce of the latest response.
std::string nonce;
// The error message to be included in a NACK with the nonce. Consumed when a
// nonce is NACK'ed for the first time.
grpc_error* error = GRPC_ERROR_NONE;
~VersionState() { GRPC_ERROR_UNREF(error); }
};
struct RdsUpdate {
// The name to use in the CDS request.
std::string cluster_name;
@ -246,7 +233,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
// Creates an LRS request sending client-side load reports. If all the counters
// are zero, returns empty slice.
grpc_slice XdsLrsRequestCreateAndEncode(
std::map<StringView /*cluster_name*/, std::set<XdsClientStats*>>
std::map<StringView /*cluster_name*/, std::set<XdsClientStats*>, StringLess>
client_stats_map);
// Parses the LRS response and returns \a

@ -125,31 +125,122 @@ class XdsClient::ChannelState::AdsCallState
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
// If \a type_url is an unsupported type, \a nonce_for_unsupported_type and
// \a error_for_unsupported_type will be used in the request; otherwise, the
// nonce and error stored in each ADS call state will be used. Takes ownership
// of \a error_for_unsupported_type.
void SendMessageLocked(const std::string& type_url,
const std::string& nonce_for_unsupported_type,
grpc_error* error_for_unsupported_type,
bool is_first_message);
void Subscribe(const std::string& type_url, const std::string& name);
void Unsubscribe(const std::string& type_url, const std::string& name);
bool HasSubscribedResources() const;
private:
struct BufferedRequest {
std::string nonce;
grpc_error* error;
class ResourceState : public InternallyRefCounted<ResourceState> {
public:
ResourceState(const std::string& type_url, const std::string& name)
: type_url_(type_url), name_(name) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
void Orphan() override {
Finish();
Unref();
}
void Start(RefCountedPtr<AdsCallState> ads_calld) {
if (sent_) return;
sent_ = true;
ads_calld_ = std::move(ads_calld);
Ref().release();
timer_pending_ = true;
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
&timer_callback_);
}
void Finish() {
if (timer_pending_) {
grpc_timer_cancel(&timer_);
timer_pending_ = false;
}
}
// Takes ownership of \a error.
BufferedRequest(std::string nonce, grpc_error* error)
: nonce(std::move(nonce)), error(error) {}
private:
static void OnTimer(void* arg, grpc_error* error) {
ResourceState* self = static_cast<ResourceState*>(arg);
self->ads_calld_->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&self->timer_callback_, OnTimerLocked, self,
nullptr),
GRPC_ERROR_REF(error));
}
~BufferedRequest() { GRPC_ERROR_UNREF(error); }
static void OnTimerLocked(void* arg, grpc_error* error) {
ResourceState* self = static_cast<ResourceState*>(arg);
if (error == GRPC_ERROR_NONE && self->timer_pending_) {
self->timer_pending_ = false;
char* msg;
gpr_asprintf(
&msg,
"timeout obtaining resource {type=%s name=%s} from xds server",
self->type_url_.c_str(), self->name_.c_str());
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] %s",
self->ads_calld_->xds_client(), grpc_error_string(error));
}
if (self->type_url_ == kLdsTypeUrl || self->type_url_ == kRdsTypeUrl) {
self->ads_calld_->xds_client()->service_config_watcher_->OnError(
error);
} else if (self->type_url_ == kCdsTypeUrl) {
ClusterState& state =
self->ads_calld_->xds_client()->cluster_map_[self->name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
} else if (self->type_url_ == kEdsTypeUrl) {
EndpointState& state =
self->ads_calld_->xds_client()->endpoint_map_[self->name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
} else {
GPR_UNREACHABLE_CODE(return );
}
}
self->ads_calld_.reset();
self->Unref();
}
const std::string type_url_;
const std::string name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool sent_ = false;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
};
void AcceptLdsUpdate(LdsUpdate lds_update, std::string new_version);
void AcceptRdsUpdate(RdsUpdate rds_update, std::string new_version);
void AcceptCdsUpdate(CdsUpdateMap cds_update_map, std::string new_version);
void AcceptEdsUpdate(EdsUpdateMap eds_update_map, std::string new_version);
struct ResourceTypeState {
~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
// Version, nonce, and error for this resource type.
std::string version;
std::string nonce;
grpc_error* error = GRPC_ERROR_NONE;
// Subscribed resources of this type.
std::map<std::string /* name */, OrphanablePtr<ResourceState>>
subscribed_resources;
};
void SendMessageLocked(const std::string& type_url);
void AcceptLdsUpdate(LdsUpdate lds_update);
void AcceptRdsUpdate(RdsUpdate rds_update);
void AcceptCdsUpdate(CdsUpdateMap cds_update_map);
void AcceptEdsUpdate(EdsUpdateMap eds_update_map);
static void OnRequestSent(void* arg, grpc_error* error);
static void OnRequestSentLocked(void* arg, grpc_error* error);
@ -160,8 +251,13 @@ class XdsClient::ChannelState::AdsCallState
bool IsCurrentCallOnChannel() const;
std::set<StringView> ClusterNamesForRequest();
std::set<StringView> EdsServiceNamesForRequest();
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;
bool sent_initial_message_ = false;
bool seen_response_ = false;
// Always non-NULL.
@ -184,15 +280,11 @@ class XdsClient::ChannelState::AdsCallState
grpc_slice status_details_;
grpc_closure on_status_received_;
// Version state.
VersionState lds_version_;
VersionState rds_version_;
VersionState cds_version_;
VersionState eds_version_;
// Resource types for which requests need to be sent.
std::set<std::string /*type_url*/> buffered_requests_;
// Buffered requests.
std::map<std::string /*type_url*/, std::unique_ptr<BufferedRequest>>
buffered_request_map_;
// State for each resource type.
std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
};
// Contains an LRS call to the xds server.
@ -445,31 +537,30 @@ void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
}
void XdsClient::ChannelState::OnResourceNamesChanged(
const std::string& type_url) {
void XdsClient::ChannelState::Subscribe(const std::string& type_url,
const std::string& name) {
if (ads_calld_ == nullptr) {
// Start the ADS call if this is the first request.
ads_calld_.reset(new RetryableCall<AdsCallState>(
Ref(DEBUG_LOCATION, "ChannelState+ads")));
// Note: AdsCallState's ctor will automatically send necessary messages, so
// we can return here.
// Note: AdsCallState's ctor will automatically subscribe to all
// resources that the XdsClient already has watchers for, so we can
// return here.
return;
}
// If the ADS call is in backoff state, we don't need to do anything now
// because when the call is restarted it will resend all necessary requests.
if (ads_calld() == nullptr) return;
// Send the message if the ADS call is active.
ads_calld()->SendMessageLocked(type_url, "", nullptr, false);
// Subscribe to this resource if the ADS call is active.
ads_calld()->Subscribe(type_url, name);
}
void XdsClient::ChannelState::OnWatcherRemoved() {
// Keep the ADS call if there are watcher(s).
for (const auto& p : xds_client()->cluster_map_) {
const ClusterState& cluster_state = p.second;
if (!cluster_state.watchers.empty()) return;
void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
const std::string& name) {
if (ads_calld_ != nullptr) {
ads_calld_->calld()->Unsubscribe(type_url, name);
if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
}
if (!xds_client()->endpoint_map_.empty()) return;
ads_calld_.reset();
}
//
@ -620,22 +711,14 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// Op: send request message.
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx);
bool initial_message = true;
if (xds_client()->service_config_watcher_ != nullptr) {
if (xds_client()->route_config_name_.empty()) {
SendMessageLocked(kLdsTypeUrl, "", nullptr, initial_message);
initial_message = false;
} else if (xds_client()->cluster_name_.empty()) {
SendMessageLocked(kRdsTypeUrl, "", nullptr, initial_message);
initial_message = false;
}
Subscribe(kLdsTypeUrl, xds_client()->server_name_);
}
if (!xds_client()->cluster_map_.empty()) {
SendMessageLocked(kCdsTypeUrl, "", nullptr, initial_message);
initial_message = false;
for (const auto& p : xds_client()->cluster_map_) {
Subscribe(kCdsTypeUrl, std::string(p.first));
}
if (!xds_client()->endpoint_map_.empty()) {
SendMessageLocked(kEdsTypeUrl, "", nullptr, initial_message);
for (const auto& p : xds_client()->endpoint_map_) {
Subscribe(kEdsTypeUrl, std::string(p.first));
}
// Op: recv initial metadata.
op = ops;
@ -693,51 +776,49 @@ void XdsClient::ChannelState::AdsCallState::Orphan() {
// we are here because xds_client has to orphan a failed call, then the
// following cancellation will be a no-op.
grpc_call_cancel(call_, nullptr);
state_map_.clear();
// Note that the initial ref is hold by on_status_received_. So the
// corresponding unref happens in on_status_received_ instead of here.
}
void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
const std::string& type_url, const std::string& nonce_for_unsupported_type,
grpc_error* error_for_unsupported_type, bool is_first_message) {
const std::string& type_url) {
// Buffer message sending if an existing message is in flight.
if (send_message_payload_ != nullptr) {
buffered_request_map_[type_url].reset(new BufferedRequest(
nonce_for_unsupported_type, error_for_unsupported_type));
buffered_requests_.insert(type_url);
return;
}
grpc_slice request_payload_slice;
auto& state = state_map_[type_url];
grpc_error* error = state.error;
state.error = GRPC_ERROR_NONE;
const XdsBootstrap::Node* node =
is_first_message ? xds_client()->bootstrap_->node() : nullptr;
sent_initial_message_ ? nullptr : xds_client()->bootstrap_->node();
const char* build_version =
is_first_message ? xds_client()->build_version_.get() : nullptr;
sent_initial_message_ ? nullptr : xds_client()->build_version_.get();
sent_initial_message_ = true;
grpc_slice request_payload_slice;
if (type_url == kLdsTypeUrl) {
request_payload_slice = XdsLdsRequestCreateAndEncode(
xds_client()->server_name_, node, build_version,
lds_version_.version_info, lds_version_.nonce, lds_version_.error);
lds_version_.error = GRPC_ERROR_NONE;
GRPC_ERROR_UNREF(error_for_unsupported_type);
xds_client()->server_name_, node, build_version, state.version,
state.nonce, error);
state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
} else if (type_url == kRdsTypeUrl) {
request_payload_slice = XdsRdsRequestCreateAndEncode(
xds_client()->route_config_name_, node, build_version,
rds_version_.version_info, rds_version_.nonce, rds_version_.error);
rds_version_.error = GRPC_ERROR_NONE;
GRPC_ERROR_UNREF(error_for_unsupported_type);
xds_client()->route_config_name_, node, build_version, state.version,
state.nonce, error);
state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref());
} else if (type_url == kCdsTypeUrl) {
request_payload_slice = XdsCdsRequestCreateAndEncode(
xds_client()->WatchedClusterNames(), node, build_version,
cds_version_.version_info, cds_version_.nonce, cds_version_.error);
cds_version_.error = GRPC_ERROR_NONE;
GRPC_ERROR_UNREF(error_for_unsupported_type);
ClusterNamesForRequest(), node, build_version, state.version,
state.nonce, error);
} else if (type_url == kEdsTypeUrl) {
request_payload_slice = XdsEdsRequestCreateAndEncode(
xds_client()->EdsServiceNames(), node, build_version,
eds_version_.version_info, eds_version_.nonce, eds_version_.error);
eds_version_.error = GRPC_ERROR_NONE;
GRPC_ERROR_UNREF(error_for_unsupported_type);
EdsServiceNamesForRequest(), node, build_version, state.version,
state.nonce, error);
} else {
request_payload_slice = XdsUnsupportedTypeNackRequestCreateAndEncode(
type_url, nonce_for_unsupported_type, error_for_unsupported_type);
type_url, state.nonce, state.error);
state_map_.erase(type_url);
}
// Create message payload.
send_message_payload_ =
@ -761,8 +842,30 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
}
}
void XdsClient::ChannelState::AdsCallState::Subscribe(
const std::string& type_url, const std::string& name) {
auto& state = state_map_[type_url].subscribed_resources[name];
if (state == nullptr) {
state = MakeOrphanable<ResourceState>(type_url, name);
SendMessageLocked(type_url);
}
}
void XdsClient::ChannelState::AdsCallState::Unsubscribe(
const std::string& type_url, const std::string& name) {
state_map_[type_url].subscribed_resources.erase(name);
SendMessageLocked(type_url);
}
bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
for (const auto& p : state_map_) {
if (!p.second.subscribed_resources.empty()) return true;
}
return false;
}
void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
LdsUpdate lds_update, std::string new_version) {
LdsUpdate lds_update) {
const std::string& cluster_name =
lds_update.rds_update.has_value()
? lds_update.rds_update.value().cluster_name
@ -775,6 +878,9 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
xds_client(), lds_update.route_config_name.c_str(),
cluster_name.c_str());
}
auto& lds_state = state_map_[kLdsTypeUrl];
auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
if (state != nullptr) state->Finish();
// Ignore identical update.
if (xds_client()->route_config_name_ == lds_update.route_config_name &&
xds_client()->cluster_name_ == cluster_name) {
@ -802,19 +908,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
}
} else {
// Send RDS request for dynamic resolution.
SendMessageLocked(kRdsTypeUrl, "", nullptr, false);
Subscribe(kRdsTypeUrl, xds_client()->route_config_name_);
}
lds_version_.version_info = std::move(new_version);
}
void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
RdsUpdate rds_update, std::string new_version) {
RdsUpdate rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] RDS update received: "
"cluster_name=%s",
xds_client(), rds_update.cluster_name.c_str());
}
auto& rds_state = state_map_[kRdsTypeUrl];
auto& state =
rds_state.subscribed_resources[xds_client()->route_config_name_];
if (state != nullptr) state->Finish();
// Ignore identical update.
if (xds_client()->cluster_name_ == rds_update.cluster_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
@ -835,14 +944,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
} else {
xds_client()->service_config_watcher_->OnError(error);
}
rds_version_.version_info = std::move(new_version);
}
void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
CdsUpdateMap cds_update_map, std::string new_version) {
CdsUpdateMap cds_update_map) {
auto& cds_state = state_map_[kCdsTypeUrl];
for (auto& p : cds_update_map) {
const char* cluster_name = p.first.c_str();
CdsUpdate& cds_update = p.second;
auto& state = cds_state.subscribed_resources[cluster_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] CDS update (cluster=%s) received: "
@ -875,14 +986,16 @@ void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
p.first->OnClusterChanged(cluster_state.update.value());
}
}
cds_version_.version_info = std::move(new_version);
}
void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
EdsUpdateMap eds_update_map, std::string new_version) {
EdsUpdateMap eds_update_map) {
auto& eds_state = state_map_[kEdsTypeUrl];
for (auto& p : eds_update_map) {
const char* eds_service_name = p.first.c_str();
EdsUpdate& eds_update = p.second;
auto& state = eds_state.subscribed_resources[eds_service_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] EDS response with %" PRIuPTR
@ -956,7 +1069,6 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
p.first->OnEndpointChanged(endpoint_state.update);
}
}
eds_version_.version_info = std::move(new_version);
}
void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
@ -977,22 +1089,17 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
self->send_message_payload_ = nullptr;
// Continue to send another pending message if any.
// TODO(roth): The current code to handle buffered messages has the
// advantage of sending only the most recent list of resource names for each
// resource type (no matter how many times that resource type has been
// requested to send while the current message sending is still pending).
// But its disadvantage is that we send the requests in fixed order of
// resource types. We need to fix this if we are seeing some resource
// type(s) starved due to frequent requests of other resource type(s).
for (auto& p : self->buffered_request_map_) {
const std::string& type_url = p.first;
std::unique_ptr<BufferedRequest>& buffered_request = p.second;
if (buffered_request != nullptr) {
self->SendMessageLocked(type_url, buffered_request->nonce,
buffered_request->error, false);
buffered_request->error = GRPC_ERROR_NONE;
buffered_request.reset();
break;
}
// advantage of sending only the most recent list of resource names for
// each resource type (no matter how many times that resource type has
// been requested to send while the current message sending is still
// pending). But its disadvantage is that we send the requests in fixed
// order of resource types. We need to fix this if we are seeing some
// resource type(s) starved due to frequent requests of other resource
// type(s).
auto it = self->buffered_requests_.begin();
if (it != self->buffered_requests_.end()) {
self->SendMessageLocked(*it);
self->buffered_requests_.erase(it);
}
}
self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
@ -1043,8 +1150,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
// Note that XdsAdsResponseDecodeAndParse() also validate the response.
grpc_error* parse_error = XdsAdsResponseDecodeAndParse(
response_slice, xds_client->server_name_, xds_client->route_config_name_,
xds_client->EdsServiceNames(), &lds_update, &rds_update, &cds_update_map,
&eds_update_map, &version, &nonce, &type_url);
ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
&cds_update_map, &eds_update_map, &version, &nonce, &type_url);
grpc_slice_unref_internal(response_slice);
if (type_url.empty()) {
// Ignore unparsable response.
@ -1052,48 +1159,34 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
xds_client, grpc_error_string(parse_error));
GRPC_ERROR_UNREF(parse_error);
} else {
// Update nonce and error.
if (type_url == kLdsTypeUrl) {
ads_calld->lds_version_.nonce = nonce;
GRPC_ERROR_UNREF(ads_calld->lds_version_.error);
ads_calld->lds_version_.error = GRPC_ERROR_REF(parse_error);
} else if (type_url == kRdsTypeUrl) {
ads_calld->rds_version_.nonce = nonce;
GRPC_ERROR_UNREF(ads_calld->rds_version_.error);
ads_calld->rds_version_.error = GRPC_ERROR_REF(parse_error);
} else if (type_url == kCdsTypeUrl) {
ads_calld->cds_version_.nonce = nonce;
GRPC_ERROR_UNREF(ads_calld->cds_version_.error);
ads_calld->cds_version_.error = GRPC_ERROR_REF(parse_error);
} else if (type_url == kEdsTypeUrl) {
ads_calld->eds_version_.nonce = nonce;
GRPC_ERROR_UNREF(ads_calld->eds_version_.error);
ads_calld->eds_version_.error = GRPC_ERROR_REF(parse_error);
}
// Update nonce.
auto& state = ads_calld->state_map_[type_url];
state.nonce = std::move(nonce);
// NACK or ACK the response.
if (parse_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(state.error);
state.error = parse_error;
// NACK unacceptable update.
gpr_log(
GPR_ERROR,
"[xds_client %p] ADS response can't be accepted, NACKing. error=%s",
xds_client, grpc_error_string(parse_error));
ads_calld->SendMessageLocked(type_url, nonce, parse_error, false);
ads_calld->SendMessageLocked(type_url);
} else {
ads_calld->seen_response_ = true;
// Accept the ADS response according to the type_url.
if (type_url == kLdsTypeUrl) {
ads_calld->AcceptLdsUpdate(std::move(lds_update), std::move(version));
ads_calld->AcceptLdsUpdate(std::move(lds_update));
} else if (type_url == kRdsTypeUrl) {
ads_calld->AcceptRdsUpdate(std::move(rds_update), std::move(version));
ads_calld->AcceptRdsUpdate(std::move(rds_update));
} else if (type_url == kCdsTypeUrl) {
ads_calld->AcceptCdsUpdate(std::move(cds_update_map),
std::move(version));
ads_calld->AcceptCdsUpdate(std::move(cds_update_map));
} else if (type_url == kEdsTypeUrl) {
ads_calld->AcceptEdsUpdate(std::move(eds_update_map),
std::move(version));
ads_calld->AcceptEdsUpdate(std::move(eds_update_map));
}
state.version = std::move(version);
// ACK the update.
ads_calld->SendMessageLocked(type_url, nonce, nullptr, false);
ads_calld->SendMessageLocked(type_url);
// Start load reporting if needed.
auto& lrs_call = ads_calld->chand()->lrs_calld_;
if (lrs_call != nullptr) {
@ -1164,6 +1257,28 @@ bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
return this == chand()->ads_calld_->calld();
}
std::set<StringView>
XdsClient::ChannelState::AdsCallState::ClusterNamesForRequest() {
std::set<StringView> cluster_names;
for (auto& p : state_map_[kCdsTypeUrl].subscribed_resources) {
cluster_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
}
return cluster_names;
}
std::set<StringView>
XdsClient::ChannelState::AdsCallState::EdsServiceNamesForRequest() {
std::set<StringView> eds_names;
for (auto& p : state_map_[kEdsTypeUrl].subscribed_resources) {
eds_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref());
}
return eds_names;
}
//
// XdsClient::ChannelState::LrsCallState::Reporter
//
@ -1584,6 +1699,12 @@ bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
namespace {
grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
return grpc_channel_args_find_integer(
&args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
{15000, 0, INT_MAX});
}
UniquePtr<char> GenerateBuildVersionString() {
char* build_version_str;
gpr_asprintf(&build_version_str, "gRPC C-core %s %s", grpc_version_string(),
@ -1598,6 +1719,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
std::unique_ptr<ServiceConfigWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout(channel_args)),
build_version_(GenerateBuildVersionString()),
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
interested_parties_(interested_parties),
@ -1618,7 +1740,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args);
if (service_config_watcher_ != nullptr) {
chand_->OnResourceNamesChanged(kLdsTypeUrl);
chand_->Subscribe(kLdsTypeUrl, std::string(server_name));
}
}
@ -1634,8 +1756,8 @@ void XdsClient::Orphan() {
void XdsClient::WatchClusterData(
StringView cluster_name, std::unique_ptr<ClusterWatcherInterface> watcher) {
const bool new_name = cluster_map_.find(cluster_name) == cluster_map_.end();
ClusterState& cluster_state = cluster_map_[cluster_name];
std::string cluster_name_str = std::string(cluster_name);
ClusterState& cluster_state = cluster_map_[cluster_name_str];
ClusterWatcherInterface* w = watcher.get();
cluster_state.watchers[w] = std::move(watcher);
// If we've already received an CDS update, notify the new watcher
@ -1643,30 +1765,29 @@ void XdsClient::WatchClusterData(
if (cluster_state.update.has_value()) {
w->OnClusterChanged(cluster_state.update.value());
}
if (new_name) chand_->OnResourceNamesChanged(kCdsTypeUrl);
chand_->Subscribe(kCdsTypeUrl, cluster_name_str);
}
void XdsClient::CancelClusterDataWatch(StringView cluster_name,
ClusterWatcherInterface* watcher) {
if (shutting_down_) return;
ClusterState& cluster_state = cluster_map_[cluster_name];
std::string cluster_name_str = std::string(cluster_name);
ClusterState& cluster_state = cluster_map_[cluster_name_str];
auto it = cluster_state.watchers.find(watcher);
if (it != cluster_state.watchers.end()) {
cluster_state.watchers.erase(it);
if (cluster_state.watchers.empty()) {
cluster_map_.erase(cluster_name);
chand_->OnResourceNamesChanged(kCdsTypeUrl);
cluster_map_.erase(cluster_name_str);
chand_->Unsubscribe(kCdsTypeUrl, cluster_name_str);
}
}
chand_->OnWatcherRemoved();
}
void XdsClient::WatchEndpointData(
StringView eds_service_name,
std::unique_ptr<EndpointWatcherInterface> watcher) {
const bool new_name =
endpoint_map_.find(eds_service_name) == endpoint_map_.end();
EndpointState& endpoint_state = endpoint_map_[eds_service_name];
std::string eds_service_name_str = std::string(eds_service_name);
EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
EndpointWatcherInterface* w = watcher.get();
endpoint_state.watchers[w] = std::move(watcher);
// If we've already received an EDS update, notify the new watcher
@ -1674,28 +1795,28 @@ void XdsClient::WatchEndpointData(
if (!endpoint_state.update.priority_list_update.empty()) {
w->OnEndpointChanged(endpoint_state.update);
}
if (new_name) chand_->OnResourceNamesChanged(kEdsTypeUrl);
chand_->Subscribe(kEdsTypeUrl, eds_service_name_str);
}
void XdsClient::CancelEndpointDataWatch(StringView eds_service_name,
EndpointWatcherInterface* watcher) {
if (shutting_down_) return;
EndpointState& endpoint_state = endpoint_map_[eds_service_name];
std::string eds_service_name_str = std::string(eds_service_name);
EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
auto it = endpoint_state.watchers.find(watcher);
if (it != endpoint_state.watchers.end()) {
endpoint_state.watchers.erase(it);
if (endpoint_state.watchers.empty()) {
endpoint_map_.erase(eds_service_name);
chand_->OnResourceNamesChanged(kEdsTypeUrl);
endpoint_map_.erase(eds_service_name_str);
chand_->Unsubscribe(kEdsTypeUrl, eds_service_name_str);
}
}
chand_->OnWatcherRemoved();
}
void XdsClient::AddClientStats(StringView /*lrs_server*/,
StringView cluster_name,
XdsClientStats* client_stats) {
EndpointState& endpoint_state = endpoint_map_[cluster_name];
EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
endpoint_state.client_stats.insert(client_stats);
@ -1705,7 +1826,7 @@ void XdsClient::AddClientStats(StringView /*lrs_server*/,
void XdsClient::RemoveClientStats(StringView /*lrs_server*/,
StringView cluster_name,
XdsClientStats* client_stats) {
EndpointState& endpoint_state = endpoint_map_[cluster_name];
EndpointState& endpoint_state = endpoint_map_[std::string(cluster_name)];
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
// TODO(roth): In principle, we should try to send a final load report
@ -1745,32 +1866,11 @@ grpc_error* XdsClient::CreateServiceConfig(
return error;
}
std::set<StringView> XdsClient::WatchedClusterNames() const {
std::set<StringView> cluster_names;
for (const auto& p : cluster_map_) {
const StringView& cluster_name = p.first;
const ClusterState& cluster_state = p.second;
// Don't request for the clusters that are cached before watched.
if (cluster_state.watchers.empty()) continue;
cluster_names.emplace(cluster_name);
}
return cluster_names;
}
std::set<StringView> XdsClient::EdsServiceNames() const {
std::set<StringView> eds_service_names;
for (const auto& p : endpoint_map_) {
const StringView& eds_service_name = p.first;
eds_service_names.emplace(eds_service_name);
}
return eds_service_names;
}
std::map<StringView, std::set<XdsClientStats*>> XdsClient::ClientStatsMap()
const {
std::map<StringView, std::set<XdsClientStats*>> client_stats_map;
std::map<StringView, std::set<XdsClientStats*>, StringLess>
XdsClient::ClientStatsMap() const {
std::map<StringView, std::set<XdsClientStats*>, StringLess> client_stats_map;
for (const auto& p : endpoint_map_) {
const StringView& cluster_name = p.first;
const StringView cluster_name = p.first;
const auto& client_stats = p.second.client_stats;
if (chand_->lrs_calld()->ShouldSendLoadReports(cluster_name)) {
client_stats_map.emplace(cluster_name, client_stats);

@ -153,8 +153,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked();
void OnResourceNamesChanged(const std::string& type_url);
void OnWatcherRemoved();
void Subscribe(const std::string& type_url, const std::string& name);
void Unsubscribe(const std::string& type_url, const std::string& name);
private:
class StateWatcher;
@ -195,11 +195,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
const std::string& cluster_name,
RefCountedPtr<ServiceConfig>* service_config) const;
std::set<StringView> WatchedClusterNames() const;
std::set<StringView> EdsServiceNames() const;
std::map<StringView, std::set<XdsClientStats*>> ClientStatsMap() const;
std::map<StringView, std::set<XdsClientStats*>, StringLess> ClientStatsMap()
const;
// Channel arg vtable functions.
static void* ChannelArgCopy(void* p);
@ -208,6 +205,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static const grpc_arg_pointer_vtable kXdsClientVtable;
const grpc_millis request_timeout_;
grpc_core::UniquePtr<char> build_version_;
Combiner* combiner_;
@ -225,10 +224,9 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
std::string route_config_name_;
std::string cluster_name_;
// All the received clusters are cached, no matter they are watched or not.
std::map<StringView /*cluster_name*/, ClusterState, StringLess> cluster_map_;
std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
// Only the watched EDS service names are stored.
std::map<StringView /*eds_service_name*/, EndpointState, StringLess>
endpoint_map_;
std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
bool shutting_down_ = false;
};

@ -75,6 +75,11 @@ class StringView final {
: StringView(ptr, ptr == nullptr ? 0 : strlen(ptr)) {}
constexpr StringView() : StringView(nullptr, 0) {}
template <typename Allocator>
StringView(
const std::basic_string<char, std::char_traits<char>, Allocator>& str)
: StringView(str.data(), str.size()) {}
constexpr const char* data() const { return ptr_; }
constexpr size_t size() const { return size_; }
constexpr bool empty() const { return size_ == 0; }

@ -427,6 +427,7 @@ class AdsServiceImpl : public AdsService {
const std::string version_str = "version_1";
const std::string nonce_str = "nonce_1";
grpc_core::MutexLock lock(&ads_mu_);
if (lds_ignore_) return;
if (lds_response_state_ == NOT_SENT) {
DiscoveryResponse response;
response.set_type_url(kLdsTypeUrl);
@ -452,6 +453,7 @@ class AdsServiceImpl : public AdsService {
const std::string version_str = "version_1";
const std::string nonce_str = "nonce_1";
grpc_core::MutexLock lock(&ads_mu_);
if (rds_ignore_) return;
if (rds_response_state_ == NOT_SENT) {
DiscoveryResponse response;
response.set_type_url(kRdsTypeUrl);
@ -477,6 +479,7 @@ class AdsServiceImpl : public AdsService {
const std::string version_str = "version_1";
const std::string nonce_str = "nonce_1";
grpc_core::MutexLock lock(&ads_mu_);
if (cds_ignore_) return;
if (cds_response_state_ == NOT_SENT) {
DiscoveryResponse response;
response.set_type_url(kCdsTypeUrl);
@ -503,6 +506,7 @@ class AdsServiceImpl : public AdsService {
std::vector<ResponseDelayPair> responses_and_delays;
{
grpc_core::MutexLock lock(&ads_mu_);
if (eds_ignore_) return;
responses_and_delays = eds_responses_and_delays_;
}
// Send response.
@ -538,8 +542,13 @@ class AdsServiceImpl : public AdsService {
// resource names). It's not causing a big problem now but should be
// fixed.
bool eds_sent = false;
bool seen_first_request = false;
while (!eds_sent || cds_response_state_ == SENT) {
if (!stream->Read(&request)) return;
if (!seen_first_request) {
EXPECT_TRUE(request.has_node());
seen_first_request = true;
}
if (request.type_url() == kLdsTypeUrl) {
HandleLdsRequest(&request, stream);
} else if (request.type_url() == kRdsTypeUrl) {
@ -585,23 +594,31 @@ class AdsServiceImpl : public AdsService {
lds_response_data_ = std::move(lds_response_data);
}
void set_lds_ignore() { lds_ignore_ = true; }
void SetRdsResponse(
std::map<std::string /*route_config_name*/, RouteConfiguration>
rds_response_data) {
rds_response_data_ = std::move(rds_response_data);
}
void set_rds_ignore() { rds_ignore_ = true; }
void SetCdsResponse(
std::map<std::string /*cluster_name*/, Cluster> cds_response_data) {
cds_response_data_ = std::move(cds_response_data);
}
void set_cds_ignore() { cds_ignore_ = true; }
void AddEdsResponse(const DiscoveryResponse& response, int send_after_ms) {
grpc_core::MutexLock lock(&ads_mu_);
eds_responses_and_delays_.push_back(
std::make_pair(response, send_after_ms));
}
void set_eds_ignore() { eds_ignore_ = true; }
void SetLdsToUseDynamicRds() {
auto listener = default_listener_;
HttpConnectionManager http_connection_manager;
@ -701,17 +718,21 @@ class AdsServiceImpl : public AdsService {
Listener default_listener_;
std::map<std::string /*server_name*/, Listener> lds_response_data_;
ResponseState lds_response_state_ = NOT_SENT;
bool lds_ignore_ = false;
// RDS response data.
RouteConfiguration default_route_config_;
std::map<std::string /*route_config_name*/, RouteConfiguration>
rds_response_data_;
ResponseState rds_response_state_ = NOT_SENT;
bool rds_ignore_ = false;
// CDS response data.
Cluster default_cluster_;
std::map<std::string /*cluster_name*/, Cluster> cds_response_data_;
ResponseState cds_response_state_ = NOT_SENT;
bool cds_ignore_ = false;
// EDS response data.
std::vector<ResponseDelayPair> eds_responses_and_delays_;
bool eds_ignore_ = false;
};
class LrsServiceImpl : public LrsService {
@ -895,7 +916,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
void ResetStub(int fallback_timeout = 0, int failover_timeout = 0,
const grpc::string& expected_targets = "") {
const grpc::string& expected_targets = "",
int xds_resource_does_not_exist_timeout = 0) {
ChannelArguments args;
// TODO(juanlishen): Add setter to ChannelArguments.
if (fallback_timeout > 0) {
@ -904,6 +926,10 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout);
}
if (xds_resource_does_not_exist_timeout > 0) {
args.SetInt(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
xds_resource_does_not_exist_timeout);
}
// If the parent channel is using the fake resolver, we inject the
// response generator for the parent here, and then SetNextResolution()
// will inject the xds channel's response generator via the parent's
@ -1625,6 +1651,15 @@ TEST_P(LdsTest, RouteActionHasNoCluster) {
AdsServiceImpl::NACKED);
}
// Tests that LDS client times out when no response received.
TEST_P(LdsTest, Timeout) {
ResetStub(0, 0, "", 500);
balancers_[0]->ads_service()->set_lds_ignore();
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
using RdsTest = BasicTest;
// Tests that RDS client should send an ACK upon correct RDS response.
@ -1751,6 +1786,16 @@ TEST_P(RdsTest, RouteActionHasNoCluster) {
AdsServiceImpl::NACKED);
}
// Tests that RDS client times out when no response received.
TEST_P(RdsTest, Timeout) {
ResetStub(0, 0, "", 500);
balancers_[0]->ads_service()->SetLdsToUseDynamicRds();
balancers_[0]->ads_service()->set_rds_ignore();
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
using CdsTest = BasicTest;
// Tests that CDS client should send an ACK upon correct CDS response.
@ -1818,6 +1863,27 @@ TEST_P(CdsTest, WrongLrsServer) {
AdsServiceImpl::NACKED);
}
// Tests that CDS client times out when no response received.
TEST_P(CdsTest, Timeout) {
ResetStub(0, 0, "", 500);
balancers_[0]->ads_service()->set_cds_ignore();
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
using EdsTest = BasicTest;
// TODO(roth): Add tests showing that RPCs fail when EDS data is invalid.
TEST_P(EdsTest, Timeout) {
ResetStub(0, 0, "", 500);
balancers_[0]->ads_service()->set_eds_ignore();
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
}
using LocalityMapTest = BasicTest;
// Tests that the localities in a locality map are picked according to their
@ -3031,6 +3097,13 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest,
TestType(true, true)),
&TestTypeName);
// EDS could be tested with or without XdsResolver, but the tests would
// be the same either way, so we test it only with XdsResolver.
INSTANTIATE_TEST_SUITE_P(XdsTest, EdsTest,
::testing::Values(TestType(true, false),
TestType(true, true)),
&TestTypeName);
INSTANTIATE_TEST_SUITE_P(XdsTest, LocalityMapTest,
::testing::Values(TestType(false, true),
TestType(false, false),

Loading…
Cancel
Save