Use LRS in xds policy

pull/19394/head
Juanli Shen 6 years ago
parent b4504ea781
commit 45dd8be442
  1. 2
      BUILD
  2. 17
      CMakeLists.txt
  3. 33
      Makefile
  4. 5
      build.yaml
  5. 6
      src/core/ext/filters/client_channel/client_channel.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy.h
  7. 1245
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  8. 183
      src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc
  9. 210
      src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h
  10. 192
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc
  11. 66
      src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h
  12. 4
      src/core/lib/gprpp/atomic.h
  13. 2
      src/core/lib/gprpp/map.h
  14. 614
      src/core/lib/transport/static_metadata.cc
  15. 151
      src/core/lib/transport/static_metadata.h
  16. 14
      src/proto/grpc/lb/v2/BUILD
  17. 0
      src/proto/grpc/lb/v2/eds_for_test.proto
  18. 180
      src/proto/grpc/lb/v2/lrs_for_test.proto
  19. 1
      test/core/end2end/fuzzers/hpack.dictionary
  20. 2
      test/core/util/test_lb_policies.cc
  21. 4
      test/cpp/end2end/BUILD
  22. 914
      test/cpp/end2end/xds_end2end_test.cc
  23. 1
      tools/codegen/core/gen_static_metadata.py
  24. 11
      tools/run_tests/generated/sources_and_headers.json

@ -1262,7 +1262,6 @@ grpc_cc_library(
"envoy_ads_upb",
"grpc_base",
"grpc_client_channel",
"grpc_lb_upb",
"grpc_resolver_fake",
],
)
@ -1286,7 +1285,6 @@ grpc_cc_library(
"envoy_ads_upb",
"grpc_base",
"grpc_client_channel",
"grpc_lb_upb",
"grpc_resolver_fake",
"grpc_secure",
],

@ -17674,17 +17674,24 @@ endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(xds_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/xds_for_test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/xds_for_test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/xds_for_test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/xds_for_test.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/eds_for_test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/eds_for_test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/lrs_for_test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/lrs_for_test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.h
test/cpp/end2end/xds_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
protobuf_generate_grpc_cpp(
src/proto/grpc/lb/v2/xds_for_test.proto
src/proto/grpc/lb/v2/eds_for_test.proto
)
protobuf_generate_grpc_cpp(
src/proto/grpc/lb/v2/lrs_for_test.proto
)
target_include_directories(xds_end2end_test

@ -2700,16 +2700,32 @@ $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc: src/proto/grpc/lb/v1/lo
endif
ifeq ($(NO_PROTOC),true)
$(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.pb.cc: protoc_dep_error
$(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.grpc.pb.cc: protoc_dep_error
$(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc: protoc_dep_error
$(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.cc: protoc_dep_error
else
$(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.pb.cc: src/proto/grpc/lb/v2/xds_for_test.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc: src/proto/grpc/lb/v2/eds_for_test.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --cpp_out=$(GENDIR) $<
$(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.grpc.pb.cc: src/proto/grpc/lb/v2/xds_for_test.proto $(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.pb.cc $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.cc: src/proto/grpc/lb/v2/eds_for_test.proto $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc $(PROTOBUF_DEP) $(PROTOC_PLUGINS)
$(E) "[GRPC] Generating gRPC's protobuf service CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(PROTOC_PLUGINS_DIR)/grpc_cpp_plugin$(EXECUTABLE_SUFFIX) $<
endif
ifeq ($(NO_PROTOC),true)
$(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.pb.cc: protoc_dep_error
$(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.cc: protoc_dep_error
else
$(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.pb.cc: src/proto/grpc/lb/v2/lrs_for_test.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS) $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc
$(E) "[PROTOC] Generating protobuf CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --cpp_out=$(GENDIR) $<
$(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.cc: src/proto/grpc/lb/v2/lrs_for_test.proto $(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.pb.cc $(PROTOBUF_DEP) $(PROTOC_PLUGINS) $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.cc
$(E) "[GRPC] Generating gRPC's protobuf service CC file from $<"
$(Q) mkdir -p `dirname $@`
$(Q) $(PROTOC) -Ithird_party/protobuf/src -I. --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(PROTOC_PLUGINS_DIR)/grpc_cpp_plugin$(EXECUTABLE_SUFFIX) $<
@ -19744,7 +19760,8 @@ endif
XDS_END2END_TEST_SRC = \
$(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.grpc.pb.cc \
$(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.cc \
$(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.cc \
test/cpp/end2end/xds_end2end_test.cc \
XDS_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(XDS_END2END_TEST_SRC))))
@ -19776,7 +19793,9 @@ endif
endif
$(OBJDIR)/$(CONFIG)/src/proto/grpc/lb/v2/xds_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/src/proto/grpc/lb/v2/eds_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/src/proto/grpc/lb/v2/lrs_for_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
@ -19787,7 +19806,7 @@ ifneq ($(NO_DEPS),true)
-include $(XDS_END2END_TEST_OBJS:.o=.dep)
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/xds_for_test.grpc.pb.cc
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/eds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.pb.cc $(GENDIR)/src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.cc
PUBLIC_HEADERS_MUST_BE_C89_SRC = \

@ -833,7 +833,6 @@ filegroups:
- grpc_base
- grpc_client_channel
- grpc_resolver_fake
- grpc_lb_upb
- name: grpc_lb_policy_xds_secure
headers:
- src/core/ext/filters/client_channel/lb_policy/xds/xds.h
@ -852,7 +851,6 @@ filegroups:
- grpc_client_channel
- grpc_resolver_fake
- grpc_secure
- grpc_lb_upb
- name: grpc_lb_subchannel_list
headers:
- src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@ -6027,7 +6025,8 @@ targets:
build: test
language: c++
src:
- src/proto/grpc/lb/v2/xds_for_test.proto
- src/proto/grpc/lb/v2/eds_for_test.proto
- src/proto/grpc/lb/v2/lrs_for_test.proto
- test/cpp/end2end/xds_end2end_test.cc
deps:
- grpc++_test_util

@ -708,7 +708,7 @@ class CallData {
LbCallState lb_call_state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
void (*lb_recv_trailing_metadata_ready_)(
void* user_data,
void* user_data, grpc_error* error,
LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
LoadBalancingPolicy::CallState* call_state) = nullptr;
void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
@ -2160,8 +2160,8 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
// Invoke callback to LB policy.
Metadata trailing_metadata(calld, calld->recv_trailing_metadata_);
calld->lb_recv_trailing_metadata_ready_(
calld->lb_recv_trailing_metadata_ready_user_data_, &trailing_metadata,
&calld->lb_call_state_);
calld->lb_recv_trailing_metadata_ready_user_data_, error,
&trailing_metadata, &calld->lb_call_state_);
// Chain to original callback.
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));

@ -174,8 +174,11 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// modified by the callback. The callback does not take ownership,
/// however, so any data that needs to be used after returning must
/// be copied.
// TODO(roth): Replace grpc_error with something better before we allow
// people outside of gRPC team to use this API.
void (*recv_trailing_metadata_ready)(
void* user_data, MetadataInterface* recv_trailing_metadata,
void* user_data, grpc_error* error,
MetadataInterface* recv_trailing_metadata,
CallState* call_state) = nullptr;
void* recv_trailing_metadata_ready_user_data = nullptr;
};

File diff suppressed because it is too large Load Diff

@ -26,60 +26,163 @@
namespace grpc_core {
void XdsLbClientStats::AddCallStarted() {
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
namespace {
template <typename T>
T GetAndResetCounter(Atomic<T>* from) {
return from->Exchange(0, MemoryOrder::RELAXED);
}
} // namespace
//
// XdsClientStats::LocalityStats::LoadMetric::Snapshot
//
bool XdsClientStats::LocalityStats::LoadMetric::Snapshot::IsAllZero() const {
return total_metric_value == 0 && num_requests_finished_with_metric == 0;
}
//
// XdsClientStats::LocalityStats::LoadMetric
//
XdsClientStats::LocalityStats::LoadMetric::Snapshot
XdsClientStats::LocalityStats::LoadMetric::GetSnapshotAndReset() {
Snapshot metric = {num_requests_finished_with_metric_, total_metric_value_};
num_requests_finished_with_metric_ = 0;
total_metric_value_ = 0;
return metric;
}
void XdsLbClientStats::AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received) {
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(&num_calls_finished_with_client_failed_to_send_,
(gpr_atm)1);
//
// XdsClientStats::LocalityStats::Snapshot
//
bool XdsClientStats::LocalityStats::Snapshot::IsAllZero() {
if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
total_error_requests != 0 || total_issued_requests != 0) {
return false;
}
if (finished_known_received) {
gpr_atm_full_fetch_add(&num_calls_finished_known_received_, (gpr_atm)1);
for (auto& p : load_metric_stats) {
const LoadMetric::Snapshot& metric_value = p.second;
if (!metric_value.IsAllZero()) return false;
}
return true;
}
void XdsLbClientStats::AddCallDroppedLocked(char* token) {
// Increment num_calls_started and num_calls_finished.
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
// Record the drop.
if (drop_token_counts_ == nullptr) {
drop_token_counts_.reset(New<DroppedCallCounts>());
}
for (size_t i = 0; i < drop_token_counts_->size(); ++i) {
if (strcmp((*drop_token_counts_)[i].token.get(), token) == 0) {
++(*drop_token_counts_)[i].count;
return;
//
// XdsClientStats::LocalityStats
//
XdsClientStats::LocalityStats::Snapshot
XdsClientStats::LocalityStats::GetSnapshotAndReset() {
Snapshot snapshot = {
GetAndResetCounter(&total_successful_requests_),
// Don't reset total_requests_in_progress because it's not
// related to a single reporting interval.
total_requests_in_progress_.Load(MemoryOrder::RELAXED),
GetAndResetCounter(&total_error_requests_),
GetAndResetCounter(&total_issued_requests_)};
{
MutexLock lock(&load_metric_stats_mu_);
for (auto& p : load_metric_stats_) {
const char* metric_name = p.first.get();
LoadMetric& metric_value = p.second;
snapshot.load_metric_stats.emplace(
UniquePtr<char>(gpr_strdup(metric_name)),
metric_value.GetSnapshotAndReset());
}
}
// Not found, so add a new entry.
drop_token_counts_->emplace_back(UniquePtr<char>(gpr_strdup(token)), 1);
return snapshot;
}
namespace {
void XdsClientStats::LocalityStats::AddCallStarted() {
total_issued_requests_.FetchAdd(1, MemoryOrder::RELAXED);
total_requests_in_progress_.FetchAdd(1, MemoryOrder::RELAXED);
}
void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) {
*value = static_cast<int64_t>(gpr_atm_full_xchg(counter, (gpr_atm)0));
void XdsClientStats::LocalityStats::AddCallFinished(bool fail) {
Atomic<uint64_t>& to_increment =
fail ? total_error_requests_ : total_successful_requests_;
to_increment.FetchAdd(1, MemoryOrder::RELAXED);
total_requests_in_progress_.FetchAdd(-1, MemoryOrder::ACQ_REL);
}
} // namespace
//
// XdsClientStats::Snapshot
//
void XdsLbClientStats::GetLocked(
int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
UniquePtr<DroppedCallCounts>* drop_token_counts) {
AtomicGetAndResetCounter(num_calls_started, &num_calls_started_);
AtomicGetAndResetCounter(num_calls_finished, &num_calls_finished_);
AtomicGetAndResetCounter(num_calls_finished_with_client_failed_to_send,
&num_calls_finished_with_client_failed_to_send_);
AtomicGetAndResetCounter(num_calls_finished_known_received,
&num_calls_finished_known_received_);
*drop_token_counts = std::move(drop_token_counts_);
bool XdsClientStats::Snapshot::IsAllZero() {
for (auto& p : upstream_locality_stats) {
if (!p.second.IsAllZero()) return false;
}
for (auto& p : dropped_requests) {
if (p.second != 0) return false;
}
return total_dropped_requests == 0;
}
//
// XdsClientStats
//
XdsClientStats::Snapshot XdsClientStats::GetSnapshotAndReset() {
grpc_millis now = ExecCtx::Get()->Now();
// Record total_dropped_requests and reporting interval in the snapshot.
Snapshot snapshot;
snapshot.total_dropped_requests =
GetAndResetCounter(&total_dropped_requests_);
snapshot.load_report_interval = now - last_report_time_;
// Update last report time.
last_report_time_ = now;
// Snapshot all the other stats.
for (auto& p : upstream_locality_stats_) {
snapshot.upstream_locality_stats.emplace(p.first,
p.second->GetSnapshotAndReset());
}
{
MutexLock lock(&dropped_requests_mu_);
snapshot.dropped_requests = std::move(dropped_requests_);
}
return snapshot;
}
void XdsClientStats::MaybeInitLastReportTime() {
if (last_report_time_ == -1) last_report_time_ = ExecCtx::Get()->Now();
}
RefCountedPtr<XdsClientStats::LocalityStats> XdsClientStats::FindLocalityStats(
const RefCountedPtr<XdsLocalityName>& locality_name) {
auto iter = upstream_locality_stats_.find(locality_name);
if (iter == upstream_locality_stats_.end()) {
iter = upstream_locality_stats_
.emplace(locality_name, MakeRefCounted<LocalityStats>())
.first;
}
return iter->second;
}
void XdsClientStats::PruneLocalityStats() {
auto iter = upstream_locality_stats_.begin();
while (iter != upstream_locality_stats_.end()) {
if (iter->second->IsSafeToDelete()) {
iter = upstream_locality_stats_.erase(iter);
} else {
++iter;
}
}
}
void XdsClientStats::AddCallDropped(UniquePtr<char> category) {
total_dropped_requests_.FetchAdd(1, MemoryOrder::RELAXED);
MutexLock lock(&dropped_requests_mu_);
auto iter = dropped_requests_.find(category);
if (iter == dropped_requests_.end()) {
dropped_requests_.emplace(UniquePtr<char>(gpr_strdup(category.get())), 1);
} else {
++iter->second;
}
}
} // namespace grpc_core

@ -21,49 +21,207 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
class XdsLbClientStats : public RefCounted<XdsLbClientStats> {
class XdsLocalityName : public RefCounted<XdsLocalityName> {
public:
struct DropTokenCount {
UniquePtr<char> token;
int64_t count;
struct Less {
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs,
const RefCountedPtr<XdsLocalityName>& rhs) {
int cmp_result = strcmp(lhs->region_.get(), rhs->region_.get());
if (cmp_result != 0) return cmp_result < 0;
cmp_result = strcmp(lhs->zone_.get(), rhs->zone_.get());
if (cmp_result != 0) return cmp_result < 0;
return strcmp(lhs->sub_zone_.get(), rhs->sub_zone_.get()) < 0;
}
};
XdsLocalityName(UniquePtr<char> region, UniquePtr<char> zone,
UniquePtr<char> subzone)
: region_(std::move(region)),
zone_(std::move(zone)),
sub_zone_(std::move(subzone)) {}
bool operator==(const XdsLocalityName& other) const {
return strcmp(region_.get(), other.region_.get()) == 0 &&
strcmp(zone_.get(), other.zone_.get()) == 0 &&
strcmp(sub_zone_.get(), other.sub_zone_.get()) == 0;
}
const char* region() const { return region_.get(); }
const char* zone() const { return zone_.get(); }
const char* sub_zone() const { return sub_zone_.get(); }
const char* AsHumanReadableString() {
if (human_readable_string_ == nullptr) {
char* tmp;
gpr_asprintf(&tmp, "{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
region_.get(), zone_.get(), sub_zone_.get());
human_readable_string_.reset(tmp);
}
return human_readable_string_.get();
}
private:
UniquePtr<char> region_;
UniquePtr<char> zone_;
UniquePtr<char> sub_zone_;
UniquePtr<char> human_readable_string_;
};
// The stats classes (i.e., XdsClientStats, LocalityStats, and LoadMetric) can
// be taken a snapshot (and reset) to populate the load report. The snapshots
// are contained in the respective Snapshot structs. The Snapshot structs have
// no synchronization. The stats classes use several different synchronization
// methods. 1. Most of the counters are Atomic<>s for performance. 2. Some of
// the Map<>s are protected by Mutex if we are not guaranteed that the accesses
// to them are synchronized by the callers. 3. The Map<>s to which the accesses
// are already synchronized by the callers do not have additional
// synchronization here. Note that the Map<>s we mentioned in 2 and 3 refer to
// the map's tree structure rather than the content in each tree node.
class XdsClientStats {
public:
class LocalityStats : public RefCounted<LocalityStats> {
public:
class LoadMetric {
public:
struct Snapshot {
bool IsAllZero() const;
DropTokenCount(UniquePtr<char> token, int64_t count)
: token(std::move(token)), count(count) {}
uint64_t num_requests_finished_with_metric;
double total_metric_value;
};
// Returns a snapshot of this instance and reset all the accumulative
// counters.
Snapshot GetSnapshotAndReset();
private:
uint64_t num_requests_finished_with_metric_{0};
double total_metric_value_{0};
};
using LoadMetricMap = Map<UniquePtr<char>, LoadMetric, StringLess>;
using LoadMetricSnapshotMap =
Map<UniquePtr<char>, LoadMetric::Snapshot, StringLess>;
struct Snapshot {
// TODO(juanlishen): Change this to const method when const_iterator is
// added to Map<>.
bool IsAllZero();
uint64_t total_successful_requests;
uint64_t total_requests_in_progress;
uint64_t total_error_requests;
uint64_t total_issued_requests;
LoadMetricSnapshotMap load_metric_stats;
};
// Returns a snapshot of this instance and reset all the accumulative
// counters.
Snapshot GetSnapshotAndReset();
// Each XdsLb::PickerWrapper holds a ref to the perspective LocalityStats.
// If the refcount is 0, there won't be new calls recorded to the
// LocalityStats, so the LocalityStats can be safely deleted when all the
// in-progress calls have finished.
// Only be called from the control plane combiner.
void RefByPicker() { picker_refcount_.FetchAdd(1, MemoryOrder::ACQ_REL); }
// Might be called from the control plane combiner or the data plane
// combiner.
// TODO(juanlishen): Once https://github.com/grpc/grpc/pull/19390 is merged,
// this method will also only be invoked in the control plane combiner.
// We may then be able to simplify the LocalityStats' lifetime by making it
// RefCounted<> and populating the protobuf in its dtor.
void UnrefByPicker() { picker_refcount_.FetchSub(1, MemoryOrder::ACQ_REL); }
// Only be called from the control plane combiner.
// The only place where the picker_refcount_ can be increased is
// RefByPicker(), which also can only be called from the control plane
// combiner. Also, if the picker_refcount_ is 0, total_requests_in_progress_
// can't be increased from 0. So it's safe to delete the LocalityStats right
// after this method returns true.
bool IsSafeToDelete() {
return picker_refcount_.FetchAdd(0, MemoryOrder::ACQ_REL) == 0 &&
total_requests_in_progress_.FetchAdd(0, MemoryOrder::ACQ_REL) == 0;
}
void AddCallStarted();
void AddCallFinished(bool fail = false);
private:
Atomic<uint64_t> total_successful_requests_{0};
Atomic<uint64_t> total_requests_in_progress_{0};
// Requests that were issued (not dropped) but failed.
Atomic<uint64_t> total_error_requests_{0};
Atomic<uint64_t> total_issued_requests_{0};
// Protects load_metric_stats_. A mutex is necessary because the length of
// load_metric_stats_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata (not from any combiner) and the load
// reporting thread (from the control plane combiner).
Mutex load_metric_stats_mu_;
LoadMetricMap load_metric_stats_;
// Can be accessed from either the control plane combiner or the data plane
// combiner.
Atomic<uint8_t> picker_refcount_{0};
};
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
// TODO(juanlishen): The value type of Map<> must be movable in current
// implementation. To avoid making LocalityStats movable, we wrap it by
// UniquePtr<>. We should remove this wrapper if the value type of Map<>
// doesn't have to be movable.
using LocalityStatsMap =
Map<RefCountedPtr<XdsLocalityName>, RefCountedPtr<LocalityStats>,
XdsLocalityName::Less>;
using LocalityStatsSnapshotMap =
Map<RefCountedPtr<XdsLocalityName>, LocalityStats::Snapshot,
XdsLocalityName::Less>;
using DroppedRequestsMap = Map<UniquePtr<char>, uint64_t, StringLess>;
using DroppedRequestsSnapshotMap = DroppedRequestsMap;
XdsLbClientStats() {}
struct Snapshot {
// TODO(juanlishen): Change this to const method when const_iterator is
// added to Map<>.
bool IsAllZero();
void AddCallStarted();
void AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received);
LocalityStatsSnapshotMap upstream_locality_stats;
uint64_t total_dropped_requests;
DroppedRequestsSnapshotMap dropped_requests;
// The actual load report interval.
grpc_millis load_report_interval;
};
// This method is not thread-safe; caller must synchronize.
void AddCallDroppedLocked(char* token);
// Returns a snapshot of this instance and reset all the accumulative
// counters.
Snapshot GetSnapshotAndReset();
// This method is not thread-safe; caller must synchronize.
void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
UniquePtr<DroppedCallCounts>* drop_token_counts);
void MaybeInitLastReportTime();
RefCountedPtr<LocalityStats> FindLocalityStats(
const RefCountedPtr<XdsLocalityName>& locality_name);
void PruneLocalityStats();
void AddCallDropped(UniquePtr<char> category);
private:
// This field must only be accessed via *_locked() methods.
UniquePtr<DroppedCallCounts> drop_token_counts_;
// These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started_ = 0;
gpr_atm num_calls_finished_ = 0;
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
gpr_atm num_calls_finished_known_received_ = 0;
// The stats for each locality.
LocalityStatsMap upstream_locality_stats_;
Atomic<uint64_t> total_dropped_requests_{0};
// Protects dropped_requests_. A mutex is necessary because the length of
// dropped_requests_ can be accessed by both the picker (from data plane
// combiner) and the load reporting thread (from the control plane combiner).
Mutex dropped_requests_mu_;
DroppedRequestsMap dropped_requests_;
// The timestamp of last reporting. For the LB-policy-wide first report, the
// last_report_time is the time we scheduled the first reporting timer.
grpc_millis last_report_time_ = -1;
};
} // namespace grpc_core

@ -33,7 +33,10 @@
#include "envoy/api/v2/discovery.upb.h"
#include "envoy/api/v2/eds.upb.h"
#include "envoy/api/v2/endpoint/endpoint.upb.h"
#include "envoy/api/v2/endpoint/load_report.upb.h"
#include "envoy/service/load_stats/v2/lrs.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/timestamp.upb.h"
#include "google/protobuf/wrappers.upb.h"
@ -209,54 +212,163 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
namespace {
void google_protobuf_Timestamp_assign(google_protobuf_Timestamp* timestamp,
const gpr_timespec& value) {
google_protobuf_Timestamp_set_seconds(timestamp, value.tv_sec);
google_protobuf_Timestamp_set_nanos(timestamp, value.tv_nsec);
grpc_slice LrsRequestEncode(
const envoy_service_load_stats_v2_LoadStatsRequest* request,
upb_arena* arena) {
size_t output_length;
char* output = envoy_service_load_stats_v2_LoadStatsRequest_serialize(
request, arena, &output_length);
return grpc_slice_from_copied_buffer(output, output_length);
}
} // namespace
xds_grpclb_request* xds_grpclb_load_report_request_create_locked(
grpc_core::XdsLbClientStats* client_stats, upb_arena* arena) {
xds_grpclb_request* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
grpc_lb_v1_ClientStats* req_stats =
grpc_lb_v1_LoadBalanceRequest_mutable_client_stats(req, arena);
google_protobuf_Timestamp_assign(
grpc_lb_v1_ClientStats_mutable_timestamp(req_stats, arena),
gpr_now(GPR_CLOCK_REALTIME));
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name) {
upb::Arena arena;
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
// Add cluster stats. There is only one because we only use one server name in
// one channel.
envoy_api_v2_endpoint_ClusterStats* cluster_stats =
envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
request, arena.ptr());
// Set the cluster name.
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
cluster_stats, upb_strview_makez(server_name));
return LrsRequestEncode(request, arena.ptr());
}
namespace {
void LocalityStatsPopulate(envoy_api_v2_endpoint_UpstreamLocalityStats* output,
Pair<RefCountedPtr<XdsLocalityName>,
XdsClientStats::LocalityStats::Snapshot>& input,
upb_arena* arena) {
// Set sub_zone.
envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
arena);
envoy_api_v2_core_Locality_set_sub_zone(
locality, upb_strview_makez(input.first->sub_zone()));
// Set total counts.
XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
output, snapshot.total_successful_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
output, snapshot.total_requests_in_progress);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_error_requests(
output, snapshot.total_error_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_issued_requests(
output, snapshot.total_issued_requests);
// Add load metric stats.
for (auto& p : snapshot.load_metric_stats) {
const char* metric_name = p.first.get();
const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
p.second;
envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
envoy_api_v2_endpoint_UpstreamLocalityStats_add_load_metric_stats(
output, arena);
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_metric_name(
load_metric, upb_strview_makez(metric_name));
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
load_metric, metric_value.num_requests_finished_with_metric);
envoy_api_v2_endpoint_EndpointLoadMetricStats_set_total_metric_value(
load_metric, metric_value.total_metric_value);
}
}
int64_t num_calls_started;
int64_t num_calls_finished;
int64_t num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_known_received;
UniquePtr<XdsLbClientStats::DroppedCallCounts> drop_token_counts;
client_stats->GetLocked(&num_calls_started, &num_calls_finished,
&num_calls_finished_with_client_failed_to_send,
&num_calls_finished_known_received,
&drop_token_counts);
grpc_lb_v1_ClientStats_set_num_calls_started(req_stats, num_calls_started);
grpc_lb_v1_ClientStats_set_num_calls_finished(req_stats, num_calls_finished);
grpc_lb_v1_ClientStats_set_num_calls_finished_with_client_failed_to_send(
req_stats, num_calls_finished_with_client_failed_to_send);
grpc_lb_v1_ClientStats_set_num_calls_finished_known_received(
req_stats, num_calls_finished_known_received);
if (drop_token_counts != nullptr) {
for (size_t i = 0; i < drop_token_counts->size(); ++i) {
XdsLbClientStats::DropTokenCount& cur = (*drop_token_counts)[i];
grpc_lb_v1_ClientStatsPerToken* cur_msg =
grpc_lb_v1_ClientStats_add_calls_finished_with_drop(req_stats, arena);
} // namespace
const size_t token_len = strlen(cur.token.get());
char* token = reinterpret_cast<char*>(upb_arena_malloc(arena, token_len));
memcpy(token, cur.token.get(), token_len);
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
XdsClientStats* client_stats) {
upb::Arena arena;
XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
// Prune unused locality stats.
client_stats->PruneLocalityStats();
// When all the counts are zero, return empty slice.
if (snapshot.IsAllZero()) return grpc_empty_slice();
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
// Add cluster stats. There is only one because we only use one server name in
// one channel.
envoy_api_v2_endpoint_ClusterStats* cluster_stats =
envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
request, arena.ptr());
// Set the cluster name.
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
cluster_stats, upb_strview_makez(server_name));
// Add locality stats.
for (auto& p : snapshot.upstream_locality_stats) {
envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
cluster_stats, arena.ptr());
LocalityStatsPopulate(locality_stats, p, arena.ptr());
}
// Add dropped requests.
for (auto& p : snapshot.dropped_requests) {
const char* category = p.first.get();
const uint64_t count = p.second;
envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
arena.ptr());
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
dropped_requests, upb_strview_makez(category));
envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
dropped_requests, count);
}
// Set total dropped requests.
envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
cluster_stats, snapshot.total_dropped_requests);
// Set real load report interval.
gpr_timespec timespec =
grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
google_protobuf_Duration* load_report_interval =
envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
cluster_stats, arena.ptr());
google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
return LrsRequestEncode(request, arena.ptr());
}
grpc_lb_v1_ClientStatsPerToken_set_load_balance_token(
cur_msg, upb_strview_make(token, token_len));
grpc_lb_v1_ClientStatsPerToken_set_num_calls(cur_msg, cur.count);
}
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
grpc_millis* load_reporting_interval,
const char* expected_server_name) {
upb::Arena arena;
// Decode the response.
const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
envoy_service_load_stats_v2_LoadStatsResponse_parse(
reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
// Parse the response.
if (decoded_response == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
}
// Check the cluster size in the response.
size_t size;
const upb_strview* clusters =
envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
&size);
if (size != 1) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"The number of clusters (server names) is not 1.");
}
return req;
// Check the cluster name in the response
if (strncmp(expected_server_name, clusters[0].data, clusters[0].size) != 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Unexpected cluster (server name).");
}
// Get the load report interval.
const google_protobuf_Duration* load_reporting_interval_duration =
envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(
decoded_response);
gpr_timespec timespec{
google_protobuf_Duration_seconds(load_reporting_interval_duration),
google_protobuf_Duration_nanos(load_reporting_interval_duration),
GPR_TIMESPAN};
*load_reporting_interval = gpr_time_to_millis(timespec);
return GRPC_ERROR_NONE;
}
} // namespace grpc_core

@ -25,58 +25,9 @@
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/proto/grpc/lb/v1/load_balancer.upb.h"
namespace grpc_core {
typedef grpc_lb_v1_LoadBalanceRequest xds_grpclb_request;
class XdsLocalityName : public RefCounted<XdsLocalityName> {
public:
struct Less {
bool operator()(const RefCountedPtr<XdsLocalityName>& lhs,
const RefCountedPtr<XdsLocalityName>& rhs) {
int cmp_result = strcmp(lhs->region_.get(), rhs->region_.get());
if (cmp_result != 0) return cmp_result < 0;
cmp_result = strcmp(lhs->zone_.get(), rhs->zone_.get());
if (cmp_result != 0) return cmp_result < 0;
return strcmp(lhs->sub_zone_.get(), rhs->sub_zone_.get()) < 0;
}
};
XdsLocalityName(UniquePtr<char> region, UniquePtr<char> zone,
UniquePtr<char> sub_zone)
: region_(std::move(region)),
zone_(std::move(zone)),
sub_zone_(std::move(sub_zone)) {}
bool operator==(const XdsLocalityName& other) const {
return strcmp(region_.get(), other.region_.get()) == 0 &&
strcmp(zone_.get(), other.zone_.get()) == 0 &&
strcmp(sub_zone_.get(), other.sub_zone_.get()) == 0;
}
const char* region() const { return region_.get(); }
const char* zone() const { return zone_.get(); }
const char* sub_zone() const { return sub_zone_.get(); }
const char* AsHumanReadableString() {
if (human_readable_string_ == nullptr) {
char* tmp;
gpr_asprintf(&tmp, "{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
region_.get(), zone_.get(), sub_zone_.get());
human_readable_string_.reset(tmp);
}
return human_readable_string_.get();
}
private:
UniquePtr<char> region_;
UniquePtr<char> zone_;
UniquePtr<char> sub_zone_;
UniquePtr<char> human_readable_string_;
};
struct XdsLocalityInfo {
bool operator==(const XdsLocalityInfo& other) const {
return *locality_name == *other.locality_name &&
@ -112,9 +63,20 @@ grpc_slice XdsEdsRequestCreateAndEncode(const char* service_name);
grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
XdsUpdate* update);
// TODO(juanlishen): Delete these when LRS is added.
xds_grpclb_request* xds_grpclb_load_report_request_create_locked(
grpc_core::XdsLbClientStats* client_stats, upb_arena* arena);
// Creates an LRS request querying \a server_name.
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name);
// Creates an LRS request sending client-side load reports. If all the counters
// in \a client_stats are zero, returns empty slice.
grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
XdsClientStats* client_stats);
// Parses the LRS response and returns the client-side load reporting interval.
// If there is any error (e.g., the found server name doesn't match \a
// expected_server_name), the output config is invalid.
grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
grpc_millis* load_reporting_interval,
const char* expected_server_name);
} // namespace grpc_core

@ -49,6 +49,10 @@ class Atomic {
storage_.store(val, static_cast<std::memory_order>(order));
}
T Exchange(T desired, MemoryOrder order) {
return storage_.exchange(desired, static_cast<std::memory_order>(order));
}
bool CompareExchangeWeak(T* expected, T desired, MemoryOrder success,
MemoryOrder failure) {
return GPR_ATM_INC_CAS_THEN(storage_.compare_exchange_weak(

@ -89,7 +89,7 @@ class Map {
// Removes the current entry and points to the next one
iterator erase(iterator iter);
size_t size() { return size_; }
size_t size() const { return size_; }
template <class... Args>
Pair<iterator, bool> emplace(Args&&... args);

File diff suppressed because it is too large Load Diff

@ -36,7 +36,7 @@
static_assert(
std::is_trivially_destructible<grpc_core::StaticMetadataSlice>::value,
"grpc_core::StaticMetadataSlice must be trivially destructible.");
#define GRPC_STATIC_MDSTR_COUNT 107
#define GRPC_STATIC_MDSTR_COUNT 108
extern const grpc_core::StaticMetadataSlice
grpc_static_slice_table[GRPC_STATIC_MDSTR_COUNT];
/* ":path" */
@ -111,157 +111,160 @@ extern const grpc_core::StaticMetadataSlice
/* "/grpc.lb.v1.LoadBalancer/BalanceLoad" */
#define GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD \
(grpc_static_slice_table[33])
/* "/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats" */
#define GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS \
(grpc_static_slice_table[34])
/* "/envoy.api.v2.EndpointDiscoveryService/StreamEndpoints" */
#define GRPC_MDSTR_SLASH_ENVOY_DOT_API_DOT_V2_DOT_ENDPOINTDISCOVERYSERVICE_SLASH_STREAMENDPOINTS \
(grpc_static_slice_table[34])
(grpc_static_slice_table[35])
/* "/grpc.health.v1.Health/Watch" */
#define GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH \
(grpc_static_slice_table[35])
(grpc_static_slice_table[36])
/* "/envoy.service.discovery.v2.AggregatedDiscoveryService/StreamAggregatedResources"
*/
#define GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES \
(grpc_static_slice_table[36])
(grpc_static_slice_table[37])
/* "deflate" */
#define GRPC_MDSTR_DEFLATE (grpc_static_slice_table[37])
#define GRPC_MDSTR_DEFLATE (grpc_static_slice_table[38])
/* "gzip" */
#define GRPC_MDSTR_GZIP (grpc_static_slice_table[38])
#define GRPC_MDSTR_GZIP (grpc_static_slice_table[39])
/* "stream/gzip" */
#define GRPC_MDSTR_STREAM_SLASH_GZIP (grpc_static_slice_table[39])
#define GRPC_MDSTR_STREAM_SLASH_GZIP (grpc_static_slice_table[40])
/* "GET" */
#define GRPC_MDSTR_GET (grpc_static_slice_table[40])
#define GRPC_MDSTR_GET (grpc_static_slice_table[41])
/* "POST" */
#define GRPC_MDSTR_POST (grpc_static_slice_table[41])
#define GRPC_MDSTR_POST (grpc_static_slice_table[42])
/* "/" */
#define GRPC_MDSTR_SLASH (grpc_static_slice_table[42])
#define GRPC_MDSTR_SLASH (grpc_static_slice_table[43])
/* "/index.html" */
#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (grpc_static_slice_table[43])
#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (grpc_static_slice_table[44])
/* "http" */
#define GRPC_MDSTR_HTTP (grpc_static_slice_table[44])
#define GRPC_MDSTR_HTTP (grpc_static_slice_table[45])
/* "https" */
#define GRPC_MDSTR_HTTPS (grpc_static_slice_table[45])
#define GRPC_MDSTR_HTTPS (grpc_static_slice_table[46])
/* "200" */
#define GRPC_MDSTR_200 (grpc_static_slice_table[46])
#define GRPC_MDSTR_200 (grpc_static_slice_table[47])
/* "204" */
#define GRPC_MDSTR_204 (grpc_static_slice_table[47])
#define GRPC_MDSTR_204 (grpc_static_slice_table[48])
/* "206" */
#define GRPC_MDSTR_206 (grpc_static_slice_table[48])
#define GRPC_MDSTR_206 (grpc_static_slice_table[49])
/* "304" */
#define GRPC_MDSTR_304 (grpc_static_slice_table[49])
#define GRPC_MDSTR_304 (grpc_static_slice_table[50])
/* "400" */
#define GRPC_MDSTR_400 (grpc_static_slice_table[50])
#define GRPC_MDSTR_400 (grpc_static_slice_table[51])
/* "404" */
#define GRPC_MDSTR_404 (grpc_static_slice_table[51])
#define GRPC_MDSTR_404 (grpc_static_slice_table[52])
/* "500" */
#define GRPC_MDSTR_500 (grpc_static_slice_table[52])
#define GRPC_MDSTR_500 (grpc_static_slice_table[53])
/* "accept-charset" */
#define GRPC_MDSTR_ACCEPT_CHARSET (grpc_static_slice_table[53])
#define GRPC_MDSTR_ACCEPT_CHARSET (grpc_static_slice_table[54])
/* "gzip, deflate" */
#define GRPC_MDSTR_GZIP_COMMA_DEFLATE (grpc_static_slice_table[54])
#define GRPC_MDSTR_GZIP_COMMA_DEFLATE (grpc_static_slice_table[55])
/* "accept-language" */
#define GRPC_MDSTR_ACCEPT_LANGUAGE (grpc_static_slice_table[55])
#define GRPC_MDSTR_ACCEPT_LANGUAGE (grpc_static_slice_table[56])
/* "accept-ranges" */
#define GRPC_MDSTR_ACCEPT_RANGES (grpc_static_slice_table[56])
#define GRPC_MDSTR_ACCEPT_RANGES (grpc_static_slice_table[57])
/* "accept" */
#define GRPC_MDSTR_ACCEPT (grpc_static_slice_table[57])
#define GRPC_MDSTR_ACCEPT (grpc_static_slice_table[58])
/* "access-control-allow-origin" */
#define GRPC_MDSTR_ACCESS_CONTROL_ALLOW_ORIGIN (grpc_static_slice_table[58])
#define GRPC_MDSTR_ACCESS_CONTROL_ALLOW_ORIGIN (grpc_static_slice_table[59])
/* "age" */
#define GRPC_MDSTR_AGE (grpc_static_slice_table[59])
#define GRPC_MDSTR_AGE (grpc_static_slice_table[60])
/* "allow" */
#define GRPC_MDSTR_ALLOW (grpc_static_slice_table[60])
#define GRPC_MDSTR_ALLOW (grpc_static_slice_table[61])
/* "authorization" */
#define GRPC_MDSTR_AUTHORIZATION (grpc_static_slice_table[61])
#define GRPC_MDSTR_AUTHORIZATION (grpc_static_slice_table[62])
/* "cache-control" */
#define GRPC_MDSTR_CACHE_CONTROL (grpc_static_slice_table[62])
#define GRPC_MDSTR_CACHE_CONTROL (grpc_static_slice_table[63])
/* "content-disposition" */
#define GRPC_MDSTR_CONTENT_DISPOSITION (grpc_static_slice_table[63])
#define GRPC_MDSTR_CONTENT_DISPOSITION (grpc_static_slice_table[64])
/* "content-language" */
#define GRPC_MDSTR_CONTENT_LANGUAGE (grpc_static_slice_table[64])
#define GRPC_MDSTR_CONTENT_LANGUAGE (grpc_static_slice_table[65])
/* "content-length" */
#define GRPC_MDSTR_CONTENT_LENGTH (grpc_static_slice_table[65])
#define GRPC_MDSTR_CONTENT_LENGTH (grpc_static_slice_table[66])
/* "content-location" */
#define GRPC_MDSTR_CONTENT_LOCATION (grpc_static_slice_table[66])
#define GRPC_MDSTR_CONTENT_LOCATION (grpc_static_slice_table[67])
/* "content-range" */
#define GRPC_MDSTR_CONTENT_RANGE (grpc_static_slice_table[67])
#define GRPC_MDSTR_CONTENT_RANGE (grpc_static_slice_table[68])
/* "cookie" */
#define GRPC_MDSTR_COOKIE (grpc_static_slice_table[68])
#define GRPC_MDSTR_COOKIE (grpc_static_slice_table[69])
/* "date" */
#define GRPC_MDSTR_DATE (grpc_static_slice_table[69])
#define GRPC_MDSTR_DATE (grpc_static_slice_table[70])
/* "etag" */
#define GRPC_MDSTR_ETAG (grpc_static_slice_table[70])
#define GRPC_MDSTR_ETAG (grpc_static_slice_table[71])
/* "expect" */
#define GRPC_MDSTR_EXPECT (grpc_static_slice_table[71])
#define GRPC_MDSTR_EXPECT (grpc_static_slice_table[72])
/* "expires" */
#define GRPC_MDSTR_EXPIRES (grpc_static_slice_table[72])
#define GRPC_MDSTR_EXPIRES (grpc_static_slice_table[73])
/* "from" */
#define GRPC_MDSTR_FROM (grpc_static_slice_table[73])
#define GRPC_MDSTR_FROM (grpc_static_slice_table[74])
/* "if-match" */
#define GRPC_MDSTR_IF_MATCH (grpc_static_slice_table[74])
#define GRPC_MDSTR_IF_MATCH (grpc_static_slice_table[75])
/* "if-modified-since" */
#define GRPC_MDSTR_IF_MODIFIED_SINCE (grpc_static_slice_table[75])
#define GRPC_MDSTR_IF_MODIFIED_SINCE (grpc_static_slice_table[76])
/* "if-none-match" */
#define GRPC_MDSTR_IF_NONE_MATCH (grpc_static_slice_table[76])
#define GRPC_MDSTR_IF_NONE_MATCH (grpc_static_slice_table[77])
/* "if-range" */
#define GRPC_MDSTR_IF_RANGE (grpc_static_slice_table[77])
#define GRPC_MDSTR_IF_RANGE (grpc_static_slice_table[78])
/* "if-unmodified-since" */
#define GRPC_MDSTR_IF_UNMODIFIED_SINCE (grpc_static_slice_table[78])
#define GRPC_MDSTR_IF_UNMODIFIED_SINCE (grpc_static_slice_table[79])
/* "last-modified" */
#define GRPC_MDSTR_LAST_MODIFIED (grpc_static_slice_table[79])
#define GRPC_MDSTR_LAST_MODIFIED (grpc_static_slice_table[80])
/* "link" */
#define GRPC_MDSTR_LINK (grpc_static_slice_table[80])
#define GRPC_MDSTR_LINK (grpc_static_slice_table[81])
/* "location" */
#define GRPC_MDSTR_LOCATION (grpc_static_slice_table[81])
#define GRPC_MDSTR_LOCATION (grpc_static_slice_table[82])
/* "max-forwards" */
#define GRPC_MDSTR_MAX_FORWARDS (grpc_static_slice_table[82])
#define GRPC_MDSTR_MAX_FORWARDS (grpc_static_slice_table[83])
/* "proxy-authenticate" */
#define GRPC_MDSTR_PROXY_AUTHENTICATE (grpc_static_slice_table[83])
#define GRPC_MDSTR_PROXY_AUTHENTICATE (grpc_static_slice_table[84])
/* "proxy-authorization" */
#define GRPC_MDSTR_PROXY_AUTHORIZATION (grpc_static_slice_table[84])
#define GRPC_MDSTR_PROXY_AUTHORIZATION (grpc_static_slice_table[85])
/* "range" */
#define GRPC_MDSTR_RANGE (grpc_static_slice_table[85])
#define GRPC_MDSTR_RANGE (grpc_static_slice_table[86])
/* "referer" */
#define GRPC_MDSTR_REFERER (grpc_static_slice_table[86])
#define GRPC_MDSTR_REFERER (grpc_static_slice_table[87])
/* "refresh" */
#define GRPC_MDSTR_REFRESH (grpc_static_slice_table[87])
#define GRPC_MDSTR_REFRESH (grpc_static_slice_table[88])
/* "retry-after" */
#define GRPC_MDSTR_RETRY_AFTER (grpc_static_slice_table[88])
#define GRPC_MDSTR_RETRY_AFTER (grpc_static_slice_table[89])
/* "server" */
#define GRPC_MDSTR_SERVER (grpc_static_slice_table[89])
#define GRPC_MDSTR_SERVER (grpc_static_slice_table[90])
/* "set-cookie" */
#define GRPC_MDSTR_SET_COOKIE (grpc_static_slice_table[90])
#define GRPC_MDSTR_SET_COOKIE (grpc_static_slice_table[91])
/* "strict-transport-security" */
#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (grpc_static_slice_table[91])
#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (grpc_static_slice_table[92])
/* "transfer-encoding" */
#define GRPC_MDSTR_TRANSFER_ENCODING (grpc_static_slice_table[92])
#define GRPC_MDSTR_TRANSFER_ENCODING (grpc_static_slice_table[93])
/* "vary" */
#define GRPC_MDSTR_VARY (grpc_static_slice_table[93])
#define GRPC_MDSTR_VARY (grpc_static_slice_table[94])
/* "via" */
#define GRPC_MDSTR_VIA (grpc_static_slice_table[94])
#define GRPC_MDSTR_VIA (grpc_static_slice_table[95])
/* "www-authenticate" */
#define GRPC_MDSTR_WWW_AUTHENTICATE (grpc_static_slice_table[95])
#define GRPC_MDSTR_WWW_AUTHENTICATE (grpc_static_slice_table[96])
/* "0" */
#define GRPC_MDSTR_0 (grpc_static_slice_table[96])
#define GRPC_MDSTR_0 (grpc_static_slice_table[97])
/* "identity" */
#define GRPC_MDSTR_IDENTITY (grpc_static_slice_table[97])
#define GRPC_MDSTR_IDENTITY (grpc_static_slice_table[98])
/* "trailers" */
#define GRPC_MDSTR_TRAILERS (grpc_static_slice_table[98])
#define GRPC_MDSTR_TRAILERS (grpc_static_slice_table[99])
/* "application/grpc" */
#define GRPC_MDSTR_APPLICATION_SLASH_GRPC (grpc_static_slice_table[99])
#define GRPC_MDSTR_APPLICATION_SLASH_GRPC (grpc_static_slice_table[100])
/* "grpc" */
#define GRPC_MDSTR_GRPC (grpc_static_slice_table[100])
#define GRPC_MDSTR_GRPC (grpc_static_slice_table[101])
/* "PUT" */
#define GRPC_MDSTR_PUT (grpc_static_slice_table[101])
#define GRPC_MDSTR_PUT (grpc_static_slice_table[102])
/* "lb-cost-bin" */
#define GRPC_MDSTR_LB_COST_BIN (grpc_static_slice_table[102])
#define GRPC_MDSTR_LB_COST_BIN (grpc_static_slice_table[103])
/* "identity,deflate" */
#define GRPC_MDSTR_IDENTITY_COMMA_DEFLATE (grpc_static_slice_table[103])
#define GRPC_MDSTR_IDENTITY_COMMA_DEFLATE (grpc_static_slice_table[104])
/* "identity,gzip" */
#define GRPC_MDSTR_IDENTITY_COMMA_GZIP (grpc_static_slice_table[104])
#define GRPC_MDSTR_IDENTITY_COMMA_GZIP (grpc_static_slice_table[105])
/* "deflate,gzip" */
#define GRPC_MDSTR_DEFLATE_COMMA_GZIP (grpc_static_slice_table[105])
#define GRPC_MDSTR_DEFLATE_COMMA_GZIP (grpc_static_slice_table[106])
/* "identity,deflate,gzip" */
#define GRPC_MDSTR_IDENTITY_COMMA_DEFLATE_COMMA_GZIP \
(grpc_static_slice_table[106])
(grpc_static_slice_table[107])
namespace grpc_core {
struct StaticSliceRefcount;

@ -22,10 +22,20 @@ grpc_package(
)
grpc_proto_library(
name = "xds_for_test_proto",
name = "eds_for_test_proto",
srcs = [
"xds_for_test.proto",
"eds_for_test.proto",
],
has_services = True,
well_known_protos = True,
)
grpc_proto_library(
name = "lrs_for_test_proto",
srcs = [
"lrs_for_test.proto",
],
has_services = True,
well_known_protos = True,
deps = [":eds_for_test_proto"],
)

@ -0,0 +1,180 @@
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains the eds protocol and its dependency.
//
// TODO(juanlishen): It's a workaround and should be removed once we have a
// clean solution to the circular dependency between the envoy data plane APIs
// and gRPC. We can't check in this file due to conflict with internal code.
syntax = "proto3";
package envoy.service.load_stats.v2;
import "google/protobuf/duration.proto";
import "src/proto/grpc/lb/v2/eds_for_test.proto";
// [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
message EndpointLoadMetricStats {
// Name of the metric; may be empty.
string metric_name = 1;
// Number of calls that finished and included this metric.
uint64 num_requests_finished_with_metric = 2;
// Sum of metric values across all calls that finished with this metric for
// load_reporting_interval.
double total_metric_value = 3;
}
message UpstreamLocalityStats {
// Name of zone, region and optionally endpoint group these metrics were
// collected from. Zone and region names could be empty if unknown.
envoy.api.v2.Locality locality = 1;
// The total number of requests successfully completed by the endpoints in the
// locality.
uint64 total_successful_requests = 2;
// The total number of unfinished requests
uint64 total_requests_in_progress = 3;
// The total number of requests that failed due to errors at the endpoint,
// aggregated over all endpoints in the locality.
uint64 total_error_requests = 4;
// The total number of requests that were issued by this Envoy since
// the last report. This information is aggregated over all the
// upstream endpoints in the locality.
uint64 total_issued_requests = 8;
// Stats for multi-dimensional load balancing.
repeated EndpointLoadMetricStats load_metric_stats = 5;
// // Endpoint granularity stats information for this locality. This information
// // is populated if the Server requests it by setting
// // :ref:`LoadStatsResponse.report_endpoint_granularity<envoy_api_field_load_stats.LoadStatsResponse.report_endpoint_granularity>`.
// repeated UpstreamEndpointStats upstream_endpoint_stats = 7;
// [#not-implemented-hide:] The priority of the endpoint group these metrics
// were collected from.
uint32 priority = 6;
}
// Per cluster load stats. Envoy reports these stats a management server in a
// :ref:`LoadStatsRequest<envoy_api_msg_load_stats.LoadStatsRequest>`
// [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
// Next ID: 7
message ClusterStats {
// The name of the cluster.
string cluster_name = 1;
// The eds_cluster_config service_name of the cluster.
// It's possible that two clusters send the same service_name to EDS,
// in that case, the management server is supposed to do aggregation on the load reports.
string cluster_service_name = 6;
// Need at least one.
repeated UpstreamLocalityStats upstream_locality_stats = 2;
// Cluster-level stats such as total_successful_requests may be computed by
// summing upstream_locality_stats. In addition, below there are additional
// cluster-wide stats.
//
// The total number of dropped requests. This covers requests
// deliberately dropped by the drop_overload policy and circuit breaking.
uint64 total_dropped_requests = 3;
message DroppedRequests {
// Identifier for the policy specifying the drop.
string category = 1;
// Total number of deliberately dropped requests for the category.
uint64 dropped_count = 2;
}
// Information about deliberately dropped requests for each category specified
// in the DropOverload policy.
repeated DroppedRequests dropped_requests = 5;
// Period over which the actual load report occurred. This will be guaranteed to include every
// request reported. Due to system load and delays between the *LoadStatsRequest* sent from Envoy
// and the *LoadStatsResponse* message sent from the management server, this may be longer than
// the requested load reporting interval in the *LoadStatsResponse*.
google.protobuf.Duration load_report_interval = 4;
}
// [#protodoc-title: Load reporting service]
service LoadReportingService {
// Advanced API to allow for multi-dimensional load balancing by remote
// server. For receiving LB assignments, the steps are:
// 1, The management server is configured with per cluster/zone/load metric
// capacity configuration. The capacity configuration definition is
// outside of the scope of this document.
// 2. Envoy issues a standard {Stream,Fetch}Endpoints request for the clusters
// to balance.
//
// Independently, Envoy will initiate a StreamLoadStats bidi stream with a
// management server:
// 1. Once a connection establishes, the management server publishes a
// LoadStatsResponse for all clusters it is interested in learning load
// stats about.
// 2. For each cluster, Envoy load balances incoming traffic to upstream hosts
// based on per-zone weights and/or per-instance weights (if specified)
// based on intra-zone LbPolicy. This information comes from the above
// {Stream,Fetch}Endpoints.
// 3. When upstream hosts reply, they optionally add header <define header
// name> with ASCII representation of EndpointLoadMetricStats.
// 4. Envoy aggregates load reports over the period of time given to it in
// LoadStatsResponse.load_reporting_interval. This includes aggregation
// stats Envoy maintains by itself (total_requests, rpc_errors etc.) as
// well as load metrics from upstream hosts.
// 5. When the timer of load_reporting_interval expires, Envoy sends new
// LoadStatsRequest filled with load reports for each cluster.
// 6. The management server uses the load reports from all reported Envoys
// from around the world, computes global assignment and prepares traffic
// assignment destined for each zone Envoys are located in. Goto 2.
rpc StreamLoadStats(stream LoadStatsRequest) returns (stream LoadStatsResponse) {
}
}
// A load report Envoy sends to the management server.
// [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
message LoadStatsRequest {
// Node identifier for Envoy instance.
envoy.api.v2.Node node = 1;
// A list of load stats to report.
repeated ClusterStats cluster_stats = 2;
}
// The management server sends envoy a LoadStatsResponse with all clusters it
// is interested in learning load stats about.
// [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.
message LoadStatsResponse {
// Clusters to report stats for.
repeated string clusters = 1;
// The minimum interval of time to collect stats over. This is only a minimum for two reasons:
// 1. There may be some delay from when the timer fires until stats sampling occurs.
// 2. For clusters that were already feature in the previous *LoadStatsResponse*, any traffic
// that is observed in between the corresponding previous *LoadStatsRequest* and this
// *LoadStatsResponse* will also be accumulated and billed to the cluster. This avoids a period
// of inobservability that might otherwise exists between the messages. New clusters are not
// subject to this consideration.
google.protobuf.Duration load_reporting_interval = 2;
// Set to *true* if the management server supports endpoint granularity
// report.
bool report_endpoint_granularity = 3;
}

@ -33,6 +33,7 @@
"\x1Egrpc.max_request_message_bytes"
"\x1Fgrpc.max_response_message_bytes"
"$/grpc.lb.v1.LoadBalancer/BalanceLoad"
"A/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats"
"6/envoy.api.v2.EndpointDiscoveryService/StreamEndpoints"
"\x1C/grpc.health.v1.Health/Watch"
"P/envoy.service.discovery.v2.AggregatedDiscoveryService/StreamAggregatedResources"

@ -188,7 +188,7 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
private:
static void RecordRecvTrailingMetadata(
void* arg, MetadataInterface* recv_trailing_metadata,
void* arg, grpc_error* error, MetadataInterface* recv_trailing_metadata,
CallState* call_state) {
TrailingMetadataHandler* self =
static_cast<TrailingMetadataHandler*>(arg);

@ -491,8 +491,8 @@ grpc_cc_test(
"//:grpc",
"//:grpc++",
"//:grpc_resolver_fake",
"//src/proto/grpc/lb/v1:load_balancer_proto",
"//src/proto/grpc/lb/v2:xds_for_test_proto",
"//src/proto/grpc/lb/v2:eds_for_test_proto",
"//src/proto/grpc/lb/v2:lrs_for_test_proto",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",

File diff suppressed because it is too large Load Diff

@ -63,6 +63,7 @@ CONFIG = [
'grpc.max_response_message_bytes',
# well known method names
'/grpc.lb.v1.LoadBalancer/BalanceLoad',
'/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats',
'/envoy.api.v2.EndpointDiscoveryService/StreamEndpoints',
'/grpc.health.v1.Health/Watch',
'/envoy.service.discovery.v2.AggregatedDiscoveryService/StreamAggregatedResources',

@ -5307,9 +5307,12 @@
"grpc_test_util"
],
"headers": [
"src/proto/grpc/lb/v2/xds_for_test.grpc.pb.h",
"src/proto/grpc/lb/v2/xds_for_test.pb.h",
"src/proto/grpc/lb/v2/xds_for_test_mock.grpc.pb.h"
"src/proto/grpc/lb/v2/eds_for_test.grpc.pb.h",
"src/proto/grpc/lb/v2/eds_for_test.pb.h",
"src/proto/grpc/lb/v2/eds_for_test_mock.grpc.pb.h",
"src/proto/grpc/lb/v2/lrs_for_test.grpc.pb.h",
"src/proto/grpc/lb/v2/lrs_for_test.pb.h",
"src/proto/grpc/lb/v2/lrs_for_test_mock.grpc.pb.h"
],
"is_filegroup": false,
"language": "c++",
@ -9467,7 +9470,6 @@
"gpr",
"grpc_base",
"grpc_client_channel",
"grpc_lb_upb",
"grpc_resolver_fake"
],
"headers": [
@ -9498,7 +9500,6 @@
"gpr",
"grpc_base",
"grpc_client_channel",
"grpc_lb_upb",
"grpc_resolver_fake",
"grpc_secure"
],

Loading…
Cancel
Save