Improve xds logging.

pull/22309/head
Mark D. Roth 5 years ago
parent 35ee039c29
commit af62a34ae5
  1. 5
      doc/environment_variables.md
  2. 44
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  3. 36
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  4. 32
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  5. 658
      src/core/ext/filters/client_channel/xds/xds_api.cc
  6. 6
      src/core/ext/filters/client_channel/xds/xds_api.h
  7. 87
      src/core/ext/filters/client_channel/xds/xds_bootstrap.cc
  8. 6
      src/core/ext/filters/client_channel/xds/xds_bootstrap.h
  9. 72
      src/core/ext/filters/client_channel/xds/xds_client.cc

@ -49,6 +49,7 @@ some configuration as environment variables that can be set.
- cares_resolver - traces operations of the c-ares based DNS resolver
- cares_address_sorting - traces operations of the c-ares based DNS
resolver's resolved address sorter
- cds_lb - traces cds LB policy
- channel - traces operations on the C core channel stack
- client_channel_call - traces client channel call batch activity
- client_channel_routing - traces client channel call routing, including
@ -77,11 +78,15 @@ some configuration as environment variables that can be set.
- server_channel - lightweight trace of significant server channel events
- secure_endpoint - traces bytes flowing through encrypted channels
- subchannel - traces the connectivity state of subchannel
- subchannel_pool - traces subchannel pool
- timer - timers (alarms) in the grpc internals
- timer_check - more detailed trace of timer logic in grpc internals
- transport_security - traces metadata about secure channel establishment
- tcp - traces bytes in and out of a channel
- tsi - traces tsi transport security
- xds_client - traces xds client
- xds_lb - traces xds LB policy
- xds_resolver - traces xds resolver
The following tracers will only run in binaries built in DEBUG mode. This is
accomplished by invoking `CONFIG=dbg make <target>`

@ -113,8 +113,14 @@ class CdsLb : public LoadBalancingPolicy {
void CdsLb::ClusterWatcher::OnClusterChanged(XdsApi::CdsUpdate cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received CDS update from xds client",
parent_.get());
gpr_log(GPR_INFO,
"[cdslb %p] received CDS update from xds client %p: "
"eds_service_name=%s lrs_load_reporting_server_name=%s",
parent_.get(), parent_->xds_client_.get(),
cluster_data.eds_service_name.c_str(),
cluster_data.lrs_load_reporting_server_name.has_value()
? cluster_data.lrs_load_reporting_server_name.value().c_str()
: "(unset)");
}
// Construct config for child policy.
Json::Object child_config = {
@ -152,9 +158,18 @@ void CdsLb::ClusterWatcher::OnClusterChanged(XdsApi::CdsUpdate cluster_data) {
parent_->child_policy_ =
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"xds_experimental", std::move(args));
if (parent_->child_policy_ == nullptr) {
OnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to create xds_experimental child policy"));
return;
}
grpc_pollset_set_add_pollset_set(
parent_->child_policy_->interested_parties(),
parent_->interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created child policy xds_experimental (%p)",
parent_.get(), parent_->child_policy_.get());
}
}
// Update child policy.
UpdateArgs args;
@ -220,9 +235,9 @@ void CdsLb::Helper::AddTraceEvent(TraceSeverity severity, StringView message) {
CdsLb::CdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
xds_client_(XdsClient::GetFromChannelArgs(*args.args)) {
if (xds_client_ != nullptr && GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] Using xds client %p from channel", this,
xds_client_.get());
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p from channel",
this, xds_client_.get());
}
}
@ -245,6 +260,10 @@ void CdsLb::ShutdownLocked() {
}
if (xds_client_ != nullptr) {
if (cluster_watcher_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
config_->cluster().c_str());
}
xds_client_->CancelClusterDataWatch(
StringView(config_->cluster().c_str()), cluster_watcher_);
}
@ -257,12 +276,13 @@ void CdsLb::ResetBackoffLocked() {
}
void CdsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received update", this);
}
// Update config.
auto old_config = std::move(config_);
config_ = std::move(args.config);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this,
config_->cluster().c_str());
}
// Update args.
grpc_channel_args_destroy(args_);
args_ = args.args;
@ -270,9 +290,17 @@ void CdsLb::UpdateLocked(UpdateArgs args) {
// If cluster name changed, cancel watcher and restart.
if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
if (old_config != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
old_config->cluster().c_str());
}
xds_client_->CancelClusterDataWatch(
StringView(old_config->cluster().c_str()), cluster_watcher_);
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
config_->cluster().c_str());
}
auto watcher = absl::make_unique<ClusterWatcher>(Ref());
cluster_watcher_ = watcher.get();
xds_client_->WatchClusterData(StringView(config_->cluster().c_str()),

@ -69,7 +69,7 @@
namespace grpc_core {
TraceFlag grpc_lb_xds_trace(false, "xds");
TraceFlag grpc_lb_xds_trace(false, "xds_lb");
namespace {
@ -619,6 +619,9 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
if (strstr(grpc_error_string(error), "xds call failed")) {
xds_policy_->channel_control_helper()->RequestReresolution();
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] xds watcher reported error (ignoring): %s",
xds_policy_.get(), grpc_error_string(error));
}
GRPC_ERROR_UNREF(error);
}
@ -643,9 +646,8 @@ XdsLb::XdsLb(Args args)
locality_map_failover_timeout_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS,
{GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})) {
if (xds_client_from_channel_ != nullptr &&
GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this,
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] created -- xds client from channel: %p", this,
xds_client_from_channel_.get());
}
// Record server name.
@ -687,6 +689,10 @@ void XdsLb::ShutdownLocked() {
// destroying the Xds client leading to a situation where the Xds lb policy is
// never destroyed.
if (xds_client_from_channel_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] cancelling watch for %s", this,
eds_service_name());
}
xds_client()->CancelEndpointDataWatch(StringView(eds_service_name()),
endpoint_watcher_);
xds_client_from_channel_.reset();
@ -781,9 +787,17 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
if (is_initial_update ||
strcmp(old_eds_service_name, eds_service_name()) != 0) {
if (!is_initial_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] cancelling watch for %s", this,
old_eds_service_name);
}
xds_client()->CancelEndpointDataWatch(StringView(old_eds_service_name),
endpoint_watcher_);
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] starting watch for %s", this,
eds_service_name());
}
auto watcher = absl::make_unique<EndpointWatcher>(
Ref(DEBUG_LOCATION, "EndpointWatcher"));
endpoint_watcher_ = watcher.get();
@ -1071,6 +1085,9 @@ void XdsLb::LocalityMap::ResetBackoffLocked() {
}
void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] constructing new picker", xds_policy());
}
// Construct a new xds picker which maintains a map of all locality pickers
// that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
@ -1086,6 +1103,11 @@ void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
end += locality->weight();
picker_list.push_back(
std::make_pair(end, locality->GetLoadReportingPicker()));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] locality=%s weight=%d picker=%p",
xds_policy(), locality_name->AsHumanReadableString(),
locality->weight(), picker_list.back().second.get());
}
}
xds_policy()->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY,
@ -1492,6 +1514,12 @@ XdsLb::LocalityMap::Locality::Helper::CreateSubchannel(
void XdsLb::LocalityMap::Locality::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (locality_->xds_policy()->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p helper %p] child policy handler %p reports state=%s",
locality_->xds_policy(), this, locality_->child_policy_.get(),
ConnectivityStateName(state));
}
// Cache the state and picker in the locality.
locality_->connectivity_state_ = state;
locality_->picker_wrapper_ =

@ -24,6 +24,8 @@
namespace grpc_core {
TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
namespace {
//
@ -38,14 +40,28 @@ class XdsResolver : public Resolver {
interested_parties_(args.pollset_set) {
char* path = args.uri->path;
if (path[0] == '/') ++path;
server_name_.reset(gpr_strdup(path));
server_name_ = path;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
server_name_.c_str());
}
}
~XdsResolver() override { grpc_channel_args_destroy(args_); }
~XdsResolver() override {
grpc_channel_args_destroy(args_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
}
}
void StartLocked() override;
void ShutdownLocked() override { xds_client_.reset(); }
void ShutdownLocked() override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
}
xds_client_.reset();
}
private:
class ServiceConfigWatcher : public XdsClient::ServiceConfigWatcherInterface {
@ -60,7 +76,7 @@ class XdsResolver : public Resolver {
RefCountedPtr<XdsResolver> resolver_;
};
grpc_core::UniquePtr<char> server_name_;
std::string server_name_;
const grpc_channel_args* args_;
grpc_pollset_set* interested_parties_;
OrphanablePtr<XdsClient> xds_client_;
@ -69,6 +85,10 @@ class XdsResolver : public Resolver {
void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
RefCountedPtr<ServiceConfig> service_config) {
if (resolver_->xds_client_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated service config: %s",
resolver_.get(), service_config->json_string().c_str());
}
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args =
@ -79,6 +99,8 @@ void XdsResolver::ServiceConfigWatcher::OnServiceConfigChanged(
void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
if (resolver_->xds_client_ == nullptr) return;
gpr_log(GPR_ERROR, "[xds_resolver %p] received error: %s", resolver_.get(),
grpc_error_string(error));
grpc_arg xds_client_arg = resolver_->xds_client_->MakeChannelArg();
Result result;
result.args =
@ -90,7 +112,7 @@ void XdsResolver::ServiceConfigWatcher::OnError(grpc_error* error) {
void XdsResolver::StartLocked() {
grpc_error* error = GRPC_ERROR_NONE;
xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties_, StringView(server_name_.get()),
combiner(), interested_parties_, server_name_,
absl::make_unique<ServiceConfigWatcher>(Ref()), *args_, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,

@ -23,6 +23,7 @@
#include <cstdlib>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include <grpc/impl/codegen/log.h>
#include <grpc/support/alloc.h>
@ -125,8 +126,11 @@ const char* XdsApi::kCdsTypeUrl = "type.googleapis.com/envoy.api.v2.Cluster";
const char* XdsApi::kEdsTypeUrl =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
XdsApi::XdsApi(const XdsBootstrap::Node* node)
: node_(node),
XdsApi::XdsApi(XdsClient* client, TraceFlag* tracer,
const XdsBootstrap::Node* node)
: client_(client),
tracer_(tracer),
node_(node),
build_version_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING, " ",
grpc_version_string())),
user_agent_name_(absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING)) {}
@ -289,6 +293,162 @@ envoy_api_v2_DiscoveryRequest* CreateDiscoveryRequest(
return request;
}
inline absl::string_view UpbStringToAbsl(const upb_strview& str) {
return absl::string_view(str.data, str.size);
}
inline void AddStringField(const char* name, const upb_strview& value,
std::vector<std::string>* fields,
bool add_if_empty = false) {
if (value.size > 0 || add_if_empty) {
fields->emplace_back(
absl::StrCat(name, ": \"", UpbStringToAbsl(value), "\""));
}
}
inline void AddLocalityField(int indent_level,
const envoy_api_v2_core_Locality* locality,
std::vector<std::string>* fields) {
std::string indent =
absl::StrJoin(std::vector<std::string>(indent_level, " "), "");
// region
std::string field = absl::StrCat(indent, "region");
AddStringField(field.c_str(), envoy_api_v2_core_Locality_region(locality),
fields);
// zone
field = absl::StrCat(indent, "zone");
AddStringField(field.c_str(), envoy_api_v2_core_Locality_zone(locality),
fields);
// sub_zone
field = absl::StrCat(indent, "sub_zone");
AddStringField(field.c_str(), envoy_api_v2_core_Locality_sub_zone(locality),
fields);
}
void AddNodeLogFields(const envoy_api_v2_core_Node* node,
std::vector<std::string>* fields) {
fields->emplace_back("node {");
// id
AddStringField(" id", envoy_api_v2_core_Node_id(node), fields);
// metadata
const google_protobuf_Struct* metadata =
envoy_api_v2_core_Node_metadata(node);
if (metadata != nullptr) {
fields->emplace_back(" metadata {");
size_t num_entries;
const google_protobuf_Struct_FieldsEntry* const* entries =
google_protobuf_Struct_fields(metadata, &num_entries);
for (size_t i = 0; i < num_entries; ++i) {
fields->emplace_back(" field {");
// key
AddStringField(" key",
google_protobuf_Struct_FieldsEntry_key(entries[i]),
fields);
// value
const google_protobuf_Value* value =
google_protobuf_Struct_FieldsEntry_value(entries[i]);
if (value != nullptr) {
std::string value_str;
if (google_protobuf_Value_has_string_value(value)) {
value_str = absl::StrCat(
"string_value: \"",
UpbStringToAbsl(google_protobuf_Value_string_value(value)), "\"");
} else if (google_protobuf_Value_has_null_value(value)) {
value_str = "null_value: NULL_VALUE";
} else if (google_protobuf_Value_has_number_value(value)) {
value_str = absl::StrCat("double_value: ",
google_protobuf_Value_number_value(value));
} else if (google_protobuf_Value_has_bool_value(value)) {
value_str = absl::StrCat("bool_value: ",
google_protobuf_Value_bool_value(value));
} else if (google_protobuf_Value_has_struct_value(value)) {
value_str = "struct_value: <not printed>";
} else if (google_protobuf_Value_has_list_value(value)) {
value_str = "list_value: <not printed>";
} else {
value_str = "<unknown>";
}
fields->emplace_back(absl::StrCat(" value { ", value_str, " }"));
}
fields->emplace_back(" }");
}
fields->emplace_back(" }");
}
// locality
const envoy_api_v2_core_Locality* locality =
envoy_api_v2_core_Node_locality(node);
if (locality != nullptr) {
fields->emplace_back(" locality {");
AddLocalityField(2, locality, fields);
fields->emplace_back(" }");
}
// build_version
AddStringField(" build_version", envoy_api_v2_core_Node_build_version(node),
fields);
// user_agent_name
AddStringField(" user_agent_name",
envoy_api_v2_core_Node_user_agent_name(node), fields);
// user_agent_version
AddStringField(" user_agent_version",
envoy_api_v2_core_Node_user_agent_version(node), fields);
// client_features
size_t num_client_features;
const upb_strview* client_features =
envoy_api_v2_core_Node_client_features(node, &num_client_features);
for (size_t i = 0; i < num_client_features; ++i) {
AddStringField(" client_features", client_features[i], fields);
}
fields->emplace_back("}");
}
void MaybeLogDiscoveryRequest(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryRequest* request) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
// TODO(roth): When we can upgrade upb, use upb textformat code to dump
// the raw proto instead of doing this manually.
std::vector<std::string> fields;
// version_info
AddStringField("version_info",
envoy_api_v2_DiscoveryRequest_version_info(request),
&fields);
// node
const envoy_api_v2_core_Node* node =
envoy_api_v2_DiscoveryRequest_node(request);
if (node != nullptr) AddNodeLogFields(node, &fields);
// resource_names
size_t num_resource_names;
const upb_strview* resource_names =
envoy_api_v2_DiscoveryRequest_resource_names(request,
&num_resource_names);
for (size_t i = 0; i < num_resource_names; ++i) {
AddStringField("resource_names", resource_names[i], &fields);
}
// type_url
AddStringField("type_url", envoy_api_v2_DiscoveryRequest_type_url(request),
&fields);
// response_nonce
AddStringField("response_nonce",
envoy_api_v2_DiscoveryRequest_response_nonce(request),
&fields);
// error_detail
const struct google_rpc_Status* error_detail =
envoy_api_v2_DiscoveryRequest_error_detail(request);
if (error_detail != nullptr) {
fields.emplace_back("error_detail {");
// code
int32_t code = google_rpc_Status_code(error_detail);
if (code != 0) fields.emplace_back(absl::StrCat(" code: ", code));
// message
AddStringField(" message", google_rpc_Status_message(error_detail),
&fields);
fields.emplace_back("}");
}
gpr_log(GPR_DEBUG, "[xds_client %p] constructed ADS request: %s", client,
absl::StrJoin(fields, "\n").c_str());
}
}
grpc_slice SerializeDiscoveryRequest(upb_arena* arena,
envoy_api_v2_DiscoveryRequest* request) {
size_t output_length;
@ -305,6 +465,7 @@ grpc_slice XdsApi::CreateUnsupportedTypeNackRequest(const std::string& type_url,
upb::Arena arena;
envoy_api_v2_DiscoveryRequest* request = CreateDiscoveryRequest(
arena.ptr(), type_url.c_str(), /*version=*/"", nonce, error);
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
@ -326,6 +487,7 @@ grpc_slice XdsApi::CreateLdsRequest(const std::string& server_name,
envoy_api_v2_DiscoveryRequest_add_resource_names(
request, upb_strview_make(server_name.data(), server_name.size()),
arena.ptr());
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
@ -348,6 +510,7 @@ grpc_slice XdsApi::CreateRdsRequest(const std::string& route_config_name,
request,
upb_strview_make(route_config_name.data(), route_config_name.size()),
arena.ptr());
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
@ -371,6 +534,7 @@ grpc_slice XdsApi::CreateCdsRequest(const std::set<StringView>& cluster_names,
request, upb_strview_make(cluster_name.data(), cluster_name.size()),
arena.ptr());
}
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
@ -394,11 +558,347 @@ grpc_slice XdsApi::CreateEdsRequest(
upb_strview_make(eds_service_name.data(), eds_service_name.size()),
arena.ptr());
}
MaybeLogDiscoveryRequest(client_, tracer_, request);
return SerializeDiscoveryRequest(arena.ptr(), request);
}
namespace {
void MaybeLogDiscoveryResponse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
// TODO(roth): When we can upgrade upb, use upb textformat code to dump
// the raw proto instead of doing this manually.
std::vector<std::string> fields;
// version_info
AddStringField("version_info",
envoy_api_v2_DiscoveryResponse_version_info(response),
&fields);
// resources
size_t num_resources;
envoy_api_v2_DiscoveryResponse_resources(response, &num_resources);
fields.emplace_back(
absl::StrCat("resources: <", num_resources, " element(s)>"));
// type_url
AddStringField("type_url",
envoy_api_v2_DiscoveryResponse_type_url(response), &fields);
// nonce
AddStringField("nonce", envoy_api_v2_DiscoveryResponse_nonce(response),
&fields);
gpr_log(GPR_DEBUG, "[xds_client %p] received response: %s", client,
absl::StrJoin(fields, "\n").c_str());
}
}
void MaybeLogRouteConfiguration(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_RouteConfiguration* route_config) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
// TODO(roth): When we can upgrade upb, use upb textformat code to dump
// the raw proto instead of doing this manually.
std::vector<std::string> fields;
// name
AddStringField("name", envoy_api_v2_RouteConfiguration_name(route_config),
&fields);
// virtual_hosts
size_t num_virtual_hosts;
const envoy_api_v2_route_VirtualHost* const* virtual_hosts =
envoy_api_v2_RouteConfiguration_virtual_hosts(route_config,
&num_virtual_hosts);
for (size_t i = 0; i < num_virtual_hosts; ++i) {
const auto* virtual_host = virtual_hosts[i];
fields.push_back("virtual_hosts {");
// name
AddStringField(
" name", envoy_api_v2_route_VirtualHost_name(virtual_host), &fields);
// domains
size_t num_domains;
const upb_strview* const domains =
envoy_api_v2_route_VirtualHost_domains(virtual_host, &num_domains);
for (size_t j = 0; j < num_domains; ++j) {
AddStringField(" domains", domains[j], &fields);
}
// routes
size_t num_routes;
const envoy_api_v2_route_Route* const* routes =
envoy_api_v2_route_VirtualHost_routes(virtual_host, &num_routes);
for (size_t j = 0; j < num_routes; ++j) {
const auto* route = routes[j];
fields.push_back(" route {");
// name
AddStringField(" name", envoy_api_v2_route_Route_name(route),
&fields);
// match
const envoy_api_v2_route_RouteMatch* match =
envoy_api_v2_route_Route_match(route);
if (match != nullptr) {
fields.emplace_back(" match {");
// path matching
if (envoy_api_v2_route_RouteMatch_has_prefix(match)) {
AddStringField(" prefix",
envoy_api_v2_route_RouteMatch_prefix(match), &fields,
/*add_if_empty=*/true);
} else if (envoy_api_v2_route_RouteMatch_has_path(match)) {
AddStringField(" path",
envoy_api_v2_route_RouteMatch_path(match), &fields,
/*add_if_empty=*/true);
} else if (envoy_api_v2_route_RouteMatch_has_regex(match)) {
AddStringField(" regex",
envoy_api_v2_route_RouteMatch_regex(match), &fields,
/*add_if_empty=*/true);
} else if (envoy_api_v2_route_RouteMatch_has_safe_regex(match)) {
fields.emplace_back(" safe_regex: <not printed>");
} else {
fields.emplace_back(" <unknown path matching type>");
}
// header matching
size_t num_headers;
envoy_api_v2_route_RouteMatch_headers(match, &num_headers);
if (num_headers > 0) {
fields.emplace_back(
absl::StrCat(" headers: <", num_headers, " element(s)>"));
}
fields.emplace_back(" }");
}
// action
if (envoy_api_v2_route_Route_has_route(route)) {
const envoy_api_v2_route_RouteAction* action =
envoy_api_v2_route_Route_route(route);
fields.emplace_back(" route {");
if (envoy_api_v2_route_RouteAction_has_cluster(action)) {
AddStringField(" cluster",
envoy_api_v2_route_RouteAction_cluster(action),
&fields);
} else if (envoy_api_v2_route_RouteAction_has_cluster_header(
action)) {
AddStringField(
" cluster_header",
envoy_api_v2_route_RouteAction_cluster_header(action), &fields);
} else if (envoy_api_v2_route_RouteAction_has_weighted_clusters(
action)) {
fields.emplace_back(" weighted_clusters: <not printed>");
}
fields.emplace_back(" }");
} else if (envoy_api_v2_route_Route_has_redirect(route)) {
fields.emplace_back(" redirect: <not printed>");
} else if (envoy_api_v2_route_Route_has_direct_response(route)) {
fields.emplace_back(" direct_response: <not printed>");
} else if (envoy_api_v2_route_Route_has_filter_action(route)) {
fields.emplace_back(" filter_action: <not printed>");
}
fields.push_back(" }");
}
fields.push_back("}");
}
gpr_log(GPR_DEBUG, "[xds_client %p] RouteConfiguration: %s", client,
absl::StrJoin(fields, "\n").c_str());
}
}
void MaybeLogCluster(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_Cluster* cluster) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
// TODO(roth): When we can upgrade upb, use upb textformat code to dump
// the raw proto instead of doing this manually.
std::vector<std::string> fields;
// name
AddStringField("name", envoy_api_v2_Cluster_name(cluster), &fields);
// type
if (envoy_api_v2_Cluster_has_type(cluster)) {
fields.emplace_back(
absl::StrCat("type: ", envoy_api_v2_Cluster_type(cluster)));
} else if (envoy_api_v2_Cluster_has_cluster_type(cluster)) {
fields.emplace_back("cluster_type: <not printed>");
} else {
fields.emplace_back("<unknown type>");
}
// eds_cluster_config
const envoy_api_v2_Cluster_EdsClusterConfig* eds_cluster_config =
envoy_api_v2_Cluster_eds_cluster_config(cluster);
if (eds_cluster_config != nullptr) {
fields.emplace_back("eds_cluster_config {");
// eds_config
const struct envoy_api_v2_core_ConfigSource* eds_config =
envoy_api_v2_Cluster_EdsClusterConfig_eds_config(eds_cluster_config);
if (eds_config != nullptr) {
if (envoy_api_v2_core_ConfigSource_has_ads(eds_config)) {
fields.emplace_back(" eds_config { ads {} }");
} else {
fields.emplace_back(" eds_config: <non-ADS type>");
}
}
// service_name
AddStringField(" service_name",
envoy_api_v2_Cluster_EdsClusterConfig_service_name(
eds_cluster_config),
&fields);
fields.emplace_back("}");
}
// lb_policy
fields.emplace_back(
absl::StrCat("lb_policy: ", envoy_api_v2_Cluster_lb_policy(cluster)));
// lrs_server
const envoy_api_v2_core_ConfigSource* lrs_server =
envoy_api_v2_Cluster_lrs_server(cluster);
if (lrs_server != nullptr) {
if (envoy_api_v2_core_ConfigSource_has_self(lrs_server)) {
fields.emplace_back("lrs_server { self {} }");
} else {
fields.emplace_back("lrs_server: <non-self type>");
}
}
gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", client,
absl::StrJoin(fields, "\n").c_str());
}
}
void MaybeLogClusterLoadAssignment(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_ClusterLoadAssignment* cla) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
// TODO(roth): When we can upgrade upb, use upb textformat code to dump
// the raw proto instead of doing this manually.
std::vector<std::string> fields;
// cluster_name
AddStringField("cluster_name",
envoy_api_v2_ClusterLoadAssignment_cluster_name(cla),
&fields);
// endpoints
size_t num_localities;
const struct envoy_api_v2_endpoint_LocalityLbEndpoints* const*
locality_endpoints =
envoy_api_v2_ClusterLoadAssignment_endpoints(cla, &num_localities);
for (size_t i = 0; i < num_localities; ++i) {
const auto* locality_endpoint = locality_endpoints[i];
fields.emplace_back("endpoints {");
// locality
const auto* locality =
envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_endpoint);
if (locality != nullptr) {
fields.emplace_back(" locality {");
AddLocalityField(2, locality, &fields);
fields.emplace_back(" }");
}
// lb_endpoints
size_t num_lb_endpoints;
const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints(
locality_endpoint, &num_lb_endpoints);
for (size_t j = 0; j < num_lb_endpoints; ++j) {
const auto* lb_endpoint = lb_endpoints[j];
fields.emplace_back(" lb_endpoints {");
// health_status
uint32_t health_status =
envoy_api_v2_endpoint_LbEndpoint_health_status(lb_endpoint);
if (health_status > 0) {
fields.emplace_back(
absl::StrCat(" health_status: ", health_status));
}
// endpoint
const envoy_api_v2_endpoint_Endpoint* endpoint =
envoy_api_v2_endpoint_LbEndpoint_endpoint(lb_endpoint);
if (endpoint != nullptr) {
fields.emplace_back(" endpoint {");
// address
const auto* address =
envoy_api_v2_endpoint_Endpoint_address(endpoint);
if (address != nullptr) {
fields.emplace_back(" address {");
// socket_address
const auto* socket_address =
envoy_api_v2_core_Address_socket_address(address);
if (socket_address != nullptr) {
fields.emplace_back(" socket_address {");
// address
AddStringField(
" address",
envoy_api_v2_core_SocketAddress_address(socket_address),
&fields);
// port_value
if (envoy_api_v2_core_SocketAddress_has_port_value(
socket_address)) {
fields.emplace_back(
absl::StrCat(" port_value: ",
envoy_api_v2_core_SocketAddress_port_value(
socket_address)));
} else {
fields.emplace_back(" <non-numeric port>");
}
fields.emplace_back(" }");
} else {
fields.emplace_back(" <non-socket address>");
}
fields.emplace_back(" }");
}
fields.emplace_back(" }");
}
fields.emplace_back(" }");
}
// load_balancing_weight
const google_protobuf_UInt32Value* lb_weight =
envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight(
locality_endpoint);
if (lb_weight != nullptr) {
fields.emplace_back(
absl::StrCat(" load_balancing_weight { value: ",
google_protobuf_UInt32Value_value(lb_weight), " }"));
}
// priority
uint32_t priority =
envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_endpoint);
if (priority > 0) {
fields.emplace_back(absl::StrCat(" priority: ", priority));
}
fields.emplace_back("}");
}
// policy
const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
envoy_api_v2_ClusterLoadAssignment_policy(cla);
if (policy != nullptr) {
fields.emplace_back("policy {");
// drop_overloads
size_t num_drop_overloads;
const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
drop_overloads =
envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(
policy, &num_drop_overloads);
for (size_t i = 0; i < num_drop_overloads; ++i) {
auto* drop_overload = drop_overloads[i];
fields.emplace_back(" drop_overloads {");
// category
AddStringField(
" category",
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_category(
drop_overload),
&fields);
// drop_percentage
const auto* drop_percentage =
envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
drop_overload);
if (drop_percentage != nullptr) {
fields.emplace_back(" drop_percentage {");
fields.emplace_back(absl::StrCat(
" numerator: ",
envoy_type_FractionalPercent_numerator(drop_percentage)));
fields.emplace_back(absl::StrCat(
" denominator: ",
envoy_type_FractionalPercent_denominator(drop_percentage)));
fields.emplace_back(" }");
}
fields.emplace_back(" }");
}
// overprovisioning_factor
fields.emplace_back("}");
}
gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s", client,
absl::StrJoin(fields, "\n").c_str());
}
}
// Better match type has smaller value.
enum MatchType {
EXACT_MATCH,
@ -449,8 +949,10 @@ MatchType DomainPatternMatchType(const std::string& domain_pattern) {
}
grpc_error* RouteConfigParse(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_RouteConfiguration* route_config,
const std::string& expected_server_name, XdsApi::RdsUpdate* rds_update) {
MaybeLogRouteConfiguration(client, tracer, route_config);
// Get the virtual hosts.
size_t size;
const envoy_api_v2_route_VirtualHost* const* virtual_hosts =
@ -540,7 +1042,8 @@ grpc_error* RouteConfigParse(
return GRPC_ERROR_NONE;
}
grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
XdsApi::LdsUpdate* lds_update, upb_arena* arena) {
// Get the resources from the response.
@ -590,8 +1093,8 @@ grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
envoy_config_filter_network_http_connection_manager_v2_HttpConnectionManager_route_config(
http_connection_manager);
XdsApi::RdsUpdate rds_update;
grpc_error* error =
RouteConfigParse(route_config, expected_server_name, &rds_update);
grpc_error* error = RouteConfigParse(client, tracer, route_config,
expected_server_name, &rds_update);
if (error != GRPC_ERROR_NONE) return error;
lds_update->rds_update.emplace(std::move(rds_update));
const upb_strview route_config_name =
@ -621,7 +1124,8 @@ grpc_error* LdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
"No listener found for expected server name.");
}
grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
XdsApi::RdsUpdate* rds_update, upb_arena* arena) {
@ -655,8 +1159,8 @@ grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
if (!upb_strview_eql(name, expected_name)) continue;
// Parse the route_config.
XdsApi::RdsUpdate local_rds_update;
grpc_error* error =
RouteConfigParse(route_config, expected_server_name, &local_rds_update);
grpc_error* error = RouteConfigParse(
client, tracer, route_config, expected_server_name, &local_rds_update);
if (error != GRPC_ERROR_NONE) return error;
*rds_update = std::move(local_rds_update);
return GRPC_ERROR_NONE;
@ -665,7 +1169,8 @@ grpc_error* RdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
"No route config found for expected name.");
}
grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
grpc_error* CdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
XdsApi::CdsUpdateMap* cds_update_map,
upb_arena* arena) {
// Get the resources from the response.
@ -691,6 +1196,7 @@ grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
if (cluster == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster.");
}
MaybeLogCluster(client, tracer, cluster);
// Check the cluster_discovery_type.
if (!envoy_api_v2_Cluster_has_type(cluster)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
@ -849,6 +1355,7 @@ grpc_error* DropParseAndAppend(
}
grpc_error* EdsResponsedParse(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::set<StringView>& expected_eds_service_names,
XdsApi::EdsUpdateMap* eds_update_map, upb_arena* arena) {
@ -878,6 +1385,7 @@ grpc_error* EdsResponsedParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Can't parse cluster_load_assignment.");
}
MaybeLogClusterLoadAssignment(client, tracer, cluster_load_assignment);
// Check the cluster name (which actually means eds_service_name). Ignore
// unexpected names.
upb_strview cluster_name = envoy_api_v2_ClusterLoadAssignment_cluster_name(
@ -950,6 +1458,7 @@ grpc_error* XdsApi::ParseAdsResponse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Can't decode the whole response.");
}
MaybeLogDiscoveryResponse(client_, tracer_, response);
// Record the type_url, the version_info, and the nonce of the response.
upb_strview type_url_strview =
envoy_api_v2_DiscoveryResponse_type_url(response);
@ -961,17 +1470,19 @@ grpc_error* XdsApi::ParseAdsResponse(
*nonce = std::string(nonce_strview.data, nonce_strview.size);
// Parse the response according to the resource type.
if (*type_url == kLdsTypeUrl) {
return LdsResponseParse(response, expected_server_name, lds_update,
arena.ptr());
return LdsResponseParse(client_, tracer_, response, expected_server_name,
lds_update, arena.ptr());
} else if (*type_url == kRdsTypeUrl) {
return RdsResponseParse(response, expected_server_name,
return RdsResponseParse(client_, tracer_, response, expected_server_name,
expected_route_config_name, rds_update,
arena.ptr());
} else if (*type_url == kCdsTypeUrl) {
return CdsResponseParse(response, cds_update_map, arena.ptr());
return CdsResponseParse(client_, tracer_, response, cds_update_map,
arena.ptr());
} else if (*type_url == kEdsTypeUrl) {
return EdsResponsedParse(response, expected_eds_service_names,
eds_update_map, arena.ptr());
return EdsResponsedParse(client_, tracer_, response,
expected_eds_service_names, eds_update_map,
arena.ptr());
} else {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Unsupported ADS resource type.");
@ -980,6 +1491,121 @@ grpc_error* XdsApi::ParseAdsResponse(
namespace {
void MaybeLogLrsRequest(
XdsClient* client, TraceFlag* tracer,
const envoy_service_load_stats_v2_LoadStatsRequest* request) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer) &&
gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
// TODO(roth): When we can upgrade upb, use upb textformat code to dump
// the raw proto instead of doing this manually.
std::vector<std::string> fields;
// node
const auto* node =
envoy_service_load_stats_v2_LoadStatsRequest_node(request);
if (node != nullptr) {
AddNodeLogFields(node, &fields);
}
// cluster_stats
size_t num_cluster_stats;
const struct envoy_api_v2_endpoint_ClusterStats* const* cluster_stats =
envoy_service_load_stats_v2_LoadStatsRequest_cluster_stats(
request, &num_cluster_stats);
for (size_t i = 0; i < num_cluster_stats; ++i) {
const auto* cluster_stat = cluster_stats[i];
fields.emplace_back("cluster_stats {");
// cluster_name
AddStringField(
" cluster_name",
envoy_api_v2_endpoint_ClusterStats_cluster_name(cluster_stat),
&fields);
// cluster_service_name
AddStringField(
" cluster_service_name",
envoy_api_v2_endpoint_ClusterStats_cluster_service_name(cluster_stat),
&fields);
// upstream_locality_stats
size_t num_stats;
const envoy_api_v2_endpoint_UpstreamLocalityStats* const* stats =
envoy_api_v2_endpoint_ClusterStats_upstream_locality_stats(
cluster_stat, &num_stats);
for (size_t j = 0; j < num_stats; ++j) {
const auto* stat = stats[j];
fields.emplace_back(" upstream_locality_stats {");
// locality
const auto* locality =
envoy_api_v2_endpoint_UpstreamLocalityStats_locality(stat);
if (locality != nullptr) {
fields.emplace_back(" locality {");
AddLocalityField(3, locality, &fields);
fields.emplace_back(" }");
}
// total_successful_requests
fields.emplace_back(absl::StrCat(
" total_successful_requests: ",
envoy_api_v2_endpoint_UpstreamLocalityStats_total_successful_requests(
stat)));
// total_requests_in_progress
fields.emplace_back(absl::StrCat(
" total_requests_in_progress: ",
envoy_api_v2_endpoint_UpstreamLocalityStats_total_requests_in_progress(
stat)));
// total_error_requests
fields.emplace_back(absl::StrCat(
" total_error_requests: ",
envoy_api_v2_endpoint_UpstreamLocalityStats_total_error_requests(
stat)));
// total_issued_requests
fields.emplace_back(absl::StrCat(
" total_issued_requests: ",
envoy_api_v2_endpoint_UpstreamLocalityStats_total_issued_requests(
stat)));
fields.emplace_back(" }");
}
// total_dropped_requests
fields.emplace_back(absl::StrCat(
" total_dropped_requests: ",
envoy_api_v2_endpoint_ClusterStats_total_dropped_requests(
cluster_stat)));
// dropped_requests
size_t num_drops;
const envoy_api_v2_endpoint_ClusterStats_DroppedRequests* const* drops =
envoy_api_v2_endpoint_ClusterStats_dropped_requests(cluster_stat,
&num_drops);
for (size_t j = 0; j < num_drops; ++j) {
const auto* drop = drops[j];
fields.emplace_back(" dropped_requests {");
// category
AddStringField(
" category",
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_category(drop),
&fields);
// dropped_count
fields.emplace_back(absl::StrCat(
" dropped_count: ",
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_dropped_count(
drop)));
fields.emplace_back(" }");
}
// load_report_interval
const auto* load_report_interval =
envoy_api_v2_endpoint_ClusterStats_load_report_interval(cluster_stat);
if (load_report_interval != nullptr) {
fields.emplace_back(" load_report_interval {");
fields.emplace_back(absl::StrCat(
" seconds: ",
google_protobuf_Duration_seconds(load_report_interval)));
fields.emplace_back(
absl::StrCat(" nanos: ",
google_protobuf_Duration_nanos(load_report_interval)));
fields.emplace_back(" }");
}
fields.emplace_back("}");
}
gpr_log(GPR_DEBUG, "[xds_client %p] constructed LRS request: %s", client,
absl::StrJoin(fields, "\n").c_str());
}
}
grpc_slice SerializeLrsRequest(
const envoy_service_load_stats_v2_LoadStatsRequest* request,
upb_arena* arena) {
@ -1002,6 +1628,7 @@ grpc_slice XdsApi::CreateLrsInitialRequest(const std::string& server_name) {
arena.ptr());
PopulateNode(arena.ptr(), node_, build_version_, user_agent_name_,
server_name, node_msg);
MaybeLogLrsRequest(client_, tracer_, request);
return SerializeLrsRequest(request, arena.ptr());
}
@ -1114,6 +1741,7 @@ grpc_slice XdsApi::CreateLrsRequest(
google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
}
MaybeLogLrsRequest(client_, tracer_, request);
return SerializeLrsRequest(request, arena.ptr());
}

@ -34,6 +34,8 @@
namespace grpc_core {
class XdsClient;
class XdsApi {
public:
static const char* kLdsTypeUrl;
@ -187,7 +189,7 @@ class XdsApi {
std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
ClusterLoadReport>;
explicit XdsApi(const XdsBootstrap::Node* node);
XdsApi(XdsClient* client, TraceFlag* tracer, const XdsBootstrap::Node* node);
// Creates a request to nack an unsupported resource type.
// Takes ownership of \a error.
@ -249,6 +251,8 @@ class XdsApi {
grpc_millis* load_reporting_interval);
private:
XdsClient* client_;
TraceFlag* tracer_;
const XdsBootstrap::Node* node_;
const std::string build_version_;
const std::string user_agent_name_;

@ -29,20 +29,97 @@
namespace grpc_core {
std::unique_ptr<XdsBootstrap> XdsBootstrap::ReadFromFile(grpc_error** error) {
namespace {
UniquePtr<char> BootstrapString(const XdsBootstrap& bootstrap) {
gpr_strvec v;
gpr_strvec_init(&v);
char* tmp;
if (bootstrap.node() != nullptr) {
gpr_asprintf(&tmp,
"node={\n"
" id=\"%s\",\n"
" cluster=\"%s\",\n"
" locality={\n"
" region=\"%s\",\n"
" zone=\"%s\",\n"
" subzone=\"%s\"\n"
" },\n"
" metadata=%s,\n"
"},\n",
bootstrap.node()->id.c_str(),
bootstrap.node()->cluster.c_str(),
bootstrap.node()->locality_region.c_str(),
bootstrap.node()->locality_zone.c_str(),
bootstrap.node()->locality_subzone.c_str(),
bootstrap.node()->metadata.Dump().c_str());
gpr_strvec_add(&v, tmp);
}
gpr_asprintf(&tmp,
"servers=[\n"
" {\n"
" uri=\"%s\",\n"
" creds=[\n",
bootstrap.server().server_uri.c_str());
gpr_strvec_add(&v, tmp);
for (size_t i = 0; i < bootstrap.server().channel_creds.size(); ++i) {
const auto& creds = bootstrap.server().channel_creds[i];
gpr_asprintf(&tmp, " {type=\"%s\", config=%s},\n", creds.type.c_str(),
creds.config.Dump().c_str());
gpr_strvec_add(&v, tmp);
}
gpr_strvec_add(&v, gpr_strdup(" ]\n }\n]"));
UniquePtr<char> result(gpr_strvec_flatten(&v, nullptr));
gpr_strvec_destroy(&v);
return result;
}
} // namespace
std::unique_ptr<XdsBootstrap> XdsBootstrap::ReadFromFile(XdsClient* client,
TraceFlag* tracer,
grpc_error** error) {
grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
if (path == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"GRPC_XDS_BOOTSTRAP env var not set");
"Environment variable GRPC_XDS_BOOTSTRAP not defined");
return nullptr;
}
if (GRPC_TRACE_FLAG_ENABLED(*tracer)) {
gpr_log(GPR_INFO,
"[xds_client %p] Got bootstrap file location from "
"GRPC_XDS_BOOTSTRAP environment variable: %s",
client, path.get());
}
grpc_slice contents;
*error = grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
if (*error != GRPC_ERROR_NONE) return nullptr;
Json json = Json::Parse(StringViewFromSlice(contents), error);
StringView contents_str_view = StringViewFromSlice(contents);
if (GRPC_TRACE_FLAG_ENABLED(*tracer)) {
UniquePtr<char> str = StringViewToCString(contents_str_view);
gpr_log(GPR_DEBUG, "[xds_client %p] Bootstrap file contents: %s", client,
str.get());
}
Json json = Json::Parse(contents_str_view, error);
grpc_slice_unref_internal(contents);
if (*error != GRPC_ERROR_NONE) return nullptr;
return absl::make_unique<XdsBootstrap>(std::move(json), error);
if (*error != GRPC_ERROR_NONE) {
char* msg;
gpr_asprintf(&msg, "Failed to parse bootstrap file %s", path.get());
grpc_error* error_out =
GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(msg, error, 1);
gpr_free(msg);
GRPC_ERROR_UNREF(*error);
*error = error_out;
return nullptr;
}
std::unique_ptr<XdsBootstrap> result =
absl::make_unique<XdsBootstrap>(std::move(json), error);
if (*error == GRPC_ERROR_NONE && GRPC_TRACE_FLAG_ENABLED(*tracer)) {
gpr_log(GPR_INFO,
"[xds_client %p] Bootstrap config for creating xds client:\n%s",
client, BootstrapString(*result).get());
}
return result;
}
XdsBootstrap::XdsBootstrap(Json json, grpc_error** error) {

@ -33,6 +33,8 @@
namespace grpc_core {
class XdsClient;
class XdsBootstrap {
public:
struct Node {
@ -56,7 +58,9 @@ class XdsBootstrap {
// If *error is not GRPC_ERROR_NONE after returning, then there was an
// error reading the file.
static std::unique_ptr<XdsBootstrap> ReadFromFile(grpc_error** error);
static std::unique_ptr<XdsBootstrap> ReadFromFile(XdsClient* client,
TraceFlag* tracer,
grpc_error** error);
// Do not instantiate directly -- use ReadFromFile() above instead.
XdsBootstrap(Json json, grpc_error** error);

@ -22,6 +22,8 @@
#include <limits.h>
#include <string.h>
#include "absl/strings/str_join.h"
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@ -788,33 +790,46 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
return;
}
auto& state = state_map_[type_url];
grpc_error* error = state.error;
state.error = GRPC_ERROR_NONE;
grpc_slice request_payload_slice;
std::set<StringView> resource_names;
if (type_url == XdsApi::kLdsTypeUrl) {
resource_names.insert(xds_client()->server_name_);
request_payload_slice = xds_client()->api_.CreateLdsRequest(
xds_client()->server_name_, state.version, state.nonce, error,
!sent_initial_message_);
xds_client()->server_name_, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->server_name_]->Start(Ref());
} else if (type_url == XdsApi::kRdsTypeUrl) {
resource_names.insert(xds_client()->route_config_name_);
request_payload_slice = xds_client()->api_.CreateRdsRequest(
xds_client()->route_config_name_, state.version, state.nonce, error,
!sent_initial_message_);
xds_client()->route_config_name_, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
state.subscribed_resources[xds_client()->route_config_name_]->Start(Ref());
} else if (type_url == XdsApi::kCdsTypeUrl) {
resource_names = ClusterNamesForRequest();
request_payload_slice = xds_client()->api_.CreateCdsRequest(
ClusterNamesForRequest(), state.version, state.nonce, error,
resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error),
!sent_initial_message_);
} else if (type_url == XdsApi::kEdsTypeUrl) {
resource_names = EdsServiceNamesForRequest();
request_payload_slice = xds_client()->api_.CreateEdsRequest(
EdsServiceNamesForRequest(), state.version, state.nonce, error,
resource_names, state.version, state.nonce, GRPC_ERROR_REF(state.error),
!sent_initial_message_);
} else {
request_payload_slice = xds_client()->api_.CreateUnsupportedTypeNackRequest(
type_url, state.nonce, state.error);
type_url, state.nonce, GRPC_ERROR_REF(state.error));
state_map_.erase(type_url);
}
sent_initial_message_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
"error=%s resources=%s",
xds_client(), type_url.c_str(), state.version.c_str(),
state.nonce.c_str(), grpc_error_string(state.error),
absl::StrJoin(resource_names, " ").c_str());
}
GRPC_ERROR_UNREF(state.error);
state.error = GRPC_ERROR_NONE;
// Create message payload.
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
@ -1150,7 +1165,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
grpc_slice_unref_internal(response_slice);
if (type_url.empty()) {
// Ignore unparsable response.
gpr_log(GPR_ERROR, "[xds_client %p] No type_url found. error=%s",
gpr_log(GPR_ERROR,
"[xds_client %p] Error parsing ADS response (%s) -- ignoring",
xds_client, grpc_error_string(parse_error));
GRPC_ERROR_UNREF(parse_error);
} else {
@ -1162,10 +1178,11 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
GRPC_ERROR_UNREF(state.error);
state.error = parse_error;
// NACK unacceptable update.
gpr_log(
GPR_ERROR,
"[xds_client %p] ADS response can't be accepted, NACKing. error=%s",
xds_client, grpc_error_string(parse_error));
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response invalid for resource type %s "
"version %s, will NACK: nonce=%s error=%s",
xds_client, type_url.c_str(), version.c_str(),
state.nonce.c_str(), grpc_error_string(parse_error));
ads_calld->SendMessageLocked(type_url);
} else {
ads_calld->seen_response_ = true;
@ -1727,10 +1744,15 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
request_timeout_(GetRequestTimeout(channel_args)),
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
interested_parties_(interested_parties),
bootstrap_(XdsBootstrap::ReadFromFile(error)),
api_(bootstrap_ == nullptr ? nullptr : bootstrap_->node()),
bootstrap_(
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
api_(this, &grpc_xds_client_trace,
bootstrap_ == nullptr ? nullptr : bootstrap_->node()),
server_name_(server_name),
service_config_watcher_(std::move(watcher)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
}
if (*error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
this, grpc_error_string(*error));
@ -1755,9 +1777,17 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
}
}
XdsClient::~XdsClient() { GRPC_COMBINER_UNREF(combiner_, "xds_client"); }
XdsClient::~XdsClient() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
}
GRPC_COMBINER_UNREF(combiner_, "xds_client");
}
void XdsClient::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
}
shutting_down_ = true;
chand_.reset();
// We do not clear cluster_map_ and endpoint_map_ if the xds client was
@ -1782,6 +1812,10 @@ void XdsClient::WatchClusterData(
// If we've already received an CDS update, notify the new watcher
// immediately.
if (cluster_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
this, StringViewToCString(cluster_name).get());
}
w->OnClusterChanged(cluster_state.update.value());
}
chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
@ -1812,6 +1846,10 @@ void XdsClient::WatchEndpointData(
// If we've already received an EDS update, notify the new watcher
// immediately.
if (!endpoint_state.update.priority_list_update.empty()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
this, StringViewToCString(eds_service_name).get());
}
w->OnEndpointChanged(endpoint_state.update);
}
chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);

Loading…
Cancel
Save