Merge pull request #20340 from AspirinSJL/cluster_name

Use cluster name from LRS response to report loads
reviewable/pr20397/r1
Juanli Shen 5 years ago committed by GitHub
commit a15e56d213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  2. 11
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
  3. 10
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h

@ -328,6 +328,7 @@ class XdsLb : public LoadBalancingPolicy {
grpc_closure on_status_received_; grpc_closure on_status_received_;
// Load reporting state. // Load reporting state.
UniquePtr<char> cluster_name_;
grpc_millis load_reporting_interval_ = 0; grpc_millis load_reporting_interval_ = 0;
OrphanablePtr<Reporter> reporter_; OrphanablePtr<Reporter> reporter_;
}; };
@ -1314,7 +1315,7 @@ void XdsLb::LbChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
void XdsLb::LbChannelState::LrsCallState::Reporter::SendReportLocked() { void XdsLb::LbChannelState::LrsCallState::Reporter::SendReportLocked() {
// Create a request that contains the load report. // Create a request that contains the load report.
grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode( grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode(
xdslb_policy()->server_name_, &xdslb_policy()->client_stats_); parent_->cluster_name_.get(), &xdslb_policy()->client_stats_);
// Skip client load report if the counters were all zero in the last // Skip client load report if the counters were all zero in the last
// report and they are still zero in this one. // report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_; const bool old_val = last_report_counters_were_zero_;
@ -1535,10 +1536,10 @@ void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked(
// This anonymous lambda is a hack to avoid the usage of goto. // This anonymous lambda is a hack to avoid the usage of goto.
[&]() { [&]() {
// Parse the response. // Parse the response.
UniquePtr<char> new_cluster_name;
grpc_millis new_load_reporting_interval; grpc_millis new_load_reporting_interval;
grpc_error* parse_error = XdsLrsResponseDecodeAndParse( grpc_error* parse_error = XdsLrsResponseDecodeAndParse(
response_slice, &new_load_reporting_interval, response_slice, &new_cluster_name, &new_load_reporting_interval);
xdslb_policy->server_name_);
if (parse_error != GRPC_ERROR_NONE) { if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "[xdslb %p] LRS response parsing failed. error=%s", gpr_log(GPR_ERROR, "[xdslb %p] LRS response parsing failed. error=%s",
xdslb_policy, grpc_error_string(parse_error)); xdslb_policy, grpc_error_string(parse_error));
@ -1548,9 +1549,10 @@ void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked(
lrs_calld->seen_response_ = true; lrs_calld->seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xdslb %p] LRS response received, load_report_interval=%" PRId64 "[xdslb %p] LRS response received, cluster_name=%s, "
"ms", "load_report_interval=%" PRId64 "ms",
xdslb_policy, new_load_reporting_interval); xdslb_policy, new_cluster_name.get(),
new_load_reporting_interval);
} }
if (new_load_reporting_interval < if (new_load_reporting_interval <
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) { GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
@ -1564,7 +1566,8 @@ void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked(
} }
} }
// Ignore identical update. // Ignore identical update.
if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval) { if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval &&
strcmp(lrs_calld->cluster_name_.get(), new_cluster_name.get()) == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xdslb %p] Incoming LRS response identical to current, " "[xdslb %p] Incoming LRS response identical to current, "
@ -1573,9 +1576,10 @@ void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked(
} }
return; return;
} }
// Stop current load reporting (if any) to adopt the new reporting interval. // Stop current load reporting (if any) to adopt the new config.
lrs_calld->reporter_.reset(); lrs_calld->reporter_.reset();
// Record the new config. // Record the new config.
lrs_calld->cluster_name_ = std::move(new_cluster_name);
lrs_calld->load_reporting_interval_ = new_load_reporting_interval; lrs_calld->load_reporting_interval_ = new_load_reporting_interval;
// Try starting sending load report. // Try starting sending load report.
lrs_calld->MaybeStartReportingLocked(); lrs_calld->MaybeStartReportingLocked();

@ -414,8 +414,8 @@ grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
} }
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response, grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
grpc_millis* load_reporting_interval, UniquePtr<char>* cluster_name,
const char* expected_server_name) { grpc_millis* load_reporting_interval) {
upb::Arena arena; upb::Arena arena;
// Decode the response. // Decode the response.
const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response = const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
@ -435,11 +435,8 @@ grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"The number of clusters (server names) is not 1."); "The number of clusters (server names) is not 1.");
} }
// Check the cluster name in the response // Get the cluster name for reporting loads.
if (strncmp(expected_server_name, clusters[0].data, clusters[0].size) != 0) { *cluster_name = StringCopy(clusters[0]);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Unexpected cluster (server name).");
}
// Get the load report interval. // Get the load report interval.
const google_protobuf_Duration* load_reporting_interval_duration = const google_protobuf_Duration* load_reporting_interval_duration =
envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval( envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(

@ -114,12 +114,12 @@ grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name);
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name, grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
XdsClientStats* client_stats); XdsClientStats* client_stats);
// Parses the LRS response and returns the client-side load reporting interval. // Parses the LRS response and returns \a cluster_name and \a
// If there is any error (e.g., the found server name doesn't match \a // load_reporting_interval for client-side load reporting. If there is any
// expected_server_name), the output config is invalid. // error, the output config is invalid.
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response, grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
grpc_millis* load_reporting_interval, UniquePtr<char>* cluster_name,
const char* expected_server_name); grpc_millis* load_reporting_interval);
} // namespace grpc_core } // namespace grpc_core

Loading…
Cancel
Save