Merge pull request #22280 from donnadionne/route_response

Xds Routing LB Policy
pull/22707/head
donnadionne 5 years ago committed by GitHub
commit baa442e6e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      BUILD
  2. 1
      BUILD.gn
  3. 2
      CMakeLists.txt
  4. 2
      Makefile
  5. 2
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 1
      gRPC-Core.podspec
  9. 1
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 5
      include/grpc/impl/codegen/grpc_types.h
  12. 1
      package.xml
  13. 833
      src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
  14. 140
      src/core/ext/filters/client_channel/xds/xds_api.cc
  15. 17
      src/core/ext/filters/client_channel/xds/xds_api.h
  16. 118
      src/core/ext/filters/client_channel/xds/xds_client.cc
  17. 4
      src/core/ext/filters/client_channel/xds/xds_client.h
  18. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  19. 4
      src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
  20. 28
      src/proto/grpc/testing/echo.proto
  21. 1
      src/proto/grpc/testing/xds/lds_rds_for_test.proto
  22. 1
      src/python/grpcio/grpc_core_dependencies.py
  23. 334
      test/cpp/end2end/test_service_impl.cc
  24. 344
      test/cpp/end2end/test_service_impl.h
  25. 597
      test/cpp/end2end/xds_end2end_test.cc
  26. 43
      test/cpp/util/grpc_tool_test.cc
  27. 1
      tools/doxygen/Doxyfile.c++.internal
  28. 1
      tools/doxygen/Doxyfile.core.internal

14
BUILD

@ -322,6 +322,7 @@ grpc_cc_library(
"grpc_lb_policy_eds",
"grpc_lb_policy_grpclb",
"grpc_lb_policy_lrs",
"grpc_lb_policy_xds_routing",
"grpc_resolver_xds",
],
)
@ -341,6 +342,7 @@ grpc_cc_library(
"grpc_lb_policy_eds_secure",
"grpc_lb_policy_grpclb_secure",
"grpc_lb_policy_lrs_secure",
"grpc_lb_policy_xds_routing",
"grpc_resolver_xds_secure",
"grpc_secure",
"grpc_transport_chttp2_client_secure",
@ -1454,6 +1456,18 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_lb_policy_xds_routing",
srcs = [
"src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc",
],
language = "c++",
deps = [
"grpc_base",
"grpc_client_channel",
],
)
grpc_cc_library(
name = "grpc_lb_address_filtering",
srcs = [

@ -248,6 +248,7 @@ config("grpc_config") {
"src/core/ext/filters/client_channel/lb_policy/xds/eds.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc",
"src/core/ext/filters/client_channel/lb_policy/xds/xds.h",
"src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc",
"src/core/ext/filters/client_channel/lb_policy_factory.h",
"src/core/ext/filters/client_channel/lb_policy_registry.cc",
"src/core/ext/filters/client_channel/lb_policy_registry.h",

@ -1330,6 +1330,7 @@ add_library(grpc
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc
src/core/ext/filters/client_channel/parse_address.cc
@ -1990,6 +1991,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc
src/core/ext/filters/client_channel/local_subchannel_pool.cc
src/core/ext/filters/client_channel/parse_address.cc

@ -3655,6 +3655,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/parse_address.cc \
@ -4289,6 +4290,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/parse_address.cc \

@ -757,6 +757,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc
- src/core/ext/filters/client_channel/parse_address.cc
@ -1596,6 +1597,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
- src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
- src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc
- src/core/ext/filters/client_channel/local_subchannel_pool.cc
- src/core/ext/filters/client_channel/parse_address.cc

@ -65,6 +65,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/local_subchannel_pool.cc \
src/core/ext/filters/client_channel/parse_address.cc \

@ -34,6 +34,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\cds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\eds.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\lrs.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy\\xds\\xds_routing.cc " +
"src\\core\\ext\\filters\\client_channel\\lb_policy_registry.cc " +
"src\\core\\ext\\filters\\client_channel\\local_subchannel_pool.cc " +
"src\\core\\ext\\filters\\client_channel\\parse_address.cc " +

@ -231,6 +231,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds.h',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy_factory.h',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.h',

@ -153,6 +153,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/eds.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_factory.h )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.cc )
s.files += %w( src/core/ext/filters/client_channel/lb_policy_registry.h )

@ -458,6 +458,7 @@
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/parse_address.cc',
@ -954,6 +955,7 @@
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/parse_address.cc',

@ -359,6 +359,11 @@ typedef struct {
* The default is 15 seconds. */
#define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \
"grpc.xds_resource_does_not_exist_timeout_ms"
/* If set, enable xds routing policy. This boolean argument is currently
* disabled by default; however, it will be changed to enabled by default
* once the functionality proves stable. This arg will eventually
* be removed completely. */
#define GRPC_ARG_XDS_ROUTING_ENABLED "grpc.xds_routing_enabled"
/** If non-zero, grpc server's cronet compression workaround will be enabled */
#define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
"grpc.workaround.cronet_compression"

@ -133,6 +133,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/eds.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_registry.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy_registry.h" role="src" />

@ -0,0 +1,833 @@
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/timer.h"
#define GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
namespace grpc_core {
TraceFlag grpc_xds_routing_lb_trace(false, "xds_routing_lb");
namespace {
constexpr char kXdsRouting[] = "xds_routing_experimental";
// Config for xds_routing LB policy.
class XdsRoutingLbConfig : public LoadBalancingPolicy::Config {
public:
struct Matcher {
std::string service;
std::string method;
};
struct Route {
Matcher matcher;
std::string action;
};
using RouteTable = std::vector<Route>;
using ActionMap =
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
XdsRoutingLbConfig(ActionMap action_map, RouteTable route_table)
: action_map_(std::move(action_map)),
route_table_(std::move(route_table)) {}
const char* name() const override { return kXdsRouting; }
const ActionMap& action_map() const { return action_map_; }
const RouteTable& route_table() const { return route_table_; }
private:
ActionMap action_map_;
RouteTable route_table_;
};
// xds_routing LB policy.
class XdsRoutingLb : public LoadBalancingPolicy {
public:
explicit XdsRoutingLb(Args args);
const char* name() const override { return kXdsRouting; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
ChildPickerWrapper(std::string name,
std::unique_ptr<SubchannelPicker> picker)
: name_(std::move(name)), picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
const std::string& name() const { return name_; }
private:
std::string name_;
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using prefix or path matching and then delegates to that
// child's picker.
class RoutePicker : public SubchannelPicker {
public:
struct Route {
XdsRoutingLbConfig::Matcher matcher;
RefCountedPtr<ChildPickerWrapper> picker;
};
// Maintains an ordered xds route table as provided by RDS response.
using RouteTable = std::vector<Route>;
explicit RoutePicker(RouteTable route_table)
: route_table_(std::move(route_table)) {}
PickResult Pick(PickArgs args) override;
private:
RouteTable route_table_;
};
// Each XdsRoutingChild holds a ref to its parent XdsRoutingLb.
class XdsRoutingChild : public InternallyRefCounted<XdsRoutingChild> {
public:
XdsRoutingChild(RefCountedPtr<XdsRoutingLb> xds_routing_policy,
const std::string& name);
~XdsRoutingChild();
void Orphan() override;
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses,
const grpc_channel_args* args);
void ExitIdleLocked();
void ResetBackoffLocked();
void DeactivateLocked();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<XdsRoutingChild> xds_routing_child)
: xds_routing_child_(std::move(xds_routing_child)) {}
~Helper() { xds_routing_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, StringView message) override;
private:
RefCountedPtr<XdsRoutingChild> xds_routing_child_;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
// The owning LB policy.
RefCountedPtr<XdsRoutingLb> xds_routing_policy_;
// Points to the corresponding key in XdsRoutingLb::actions_.
const std::string& name_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
bool seen_failure_since_ready_ = false;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
bool shutdown_ = false;
};
~XdsRoutingLb();
void ShutdownLocked() override;
void UpdateStateLocked();
// Current config from the resolver.
RefCountedPtr<XdsRoutingLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
// Children.
std::map<std::string, OrphanablePtr<XdsRoutingChild>> actions_;
};
//
// XdsRoutingLb::RoutePicker
//
XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) {
absl::string_view path;
// TODO(roth): Using const auto& here trigger a warning in a macos or windows
// build:
//*(args.initial_metadata) is returning values not references.
for (const auto p : *(args.initial_metadata)) {
if (p.first == ":path") {
path = p.second;
break;
}
}
std::vector<absl::string_view> path_elements =
absl::StrSplit(path.substr(1), '/');
for (const Route& route : route_table_) {
if ((path_elements[0] == route.matcher.service &&
(path_elements[1] == route.matcher.method ||
route.matcher.method.empty())) ||
(route.matcher.service.empty() && route.matcher.method.empty())) {
return route.picker.get()->Pick(args);
}
}
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds routing picker: no matching route"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
//
// XdsRoutingLb
//
XdsRoutingLb::XdsRoutingLb(Args args) : LoadBalancingPolicy(std::move(args)) {}
XdsRoutingLb::~XdsRoutingLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] destroying xds_routing LB policy",
this);
}
}
void XdsRoutingLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] shutting down", this);
}
shutting_down_ = true;
actions_.clear();
}
void XdsRoutingLb::ExitIdleLocked() {
for (auto& p : actions_) p.second->ExitIdleLocked();
}
void XdsRoutingLb::ResetBackoffLocked() {
for (auto& p : actions_) p.second->ResetBackoffLocked();
}
void XdsRoutingLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] Received update", this);
}
// Update config.
config_ = std::move(args.config);
// Deactivate the actions not in the new config.
for (const auto& p : actions_) {
const std::string& name = p.first;
XdsRoutingChild* child = p.second.get();
if (config_->action_map().find(name) == config_->action_map().end()) {
child->DeactivateLocked();
}
}
// Add or update the actions in the new config.
for (const auto& p : config_->action_map()) {
const std::string& name = p.first;
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
auto it = actions_.find(name);
if (it == actions_.end()) {
it = actions_.emplace(std::make_pair(name, nullptr)).first;
it->second = MakeOrphanable<XdsRoutingChild>(
Ref(DEBUG_LOCATION, "XdsRoutingChild"), it->first);
}
it->second->UpdateLocked(config, args.addresses, args.args);
}
}
void XdsRoutingLb::UpdateStateLocked() {
// Also count the number of children in each state, to determine the
// overall state.
size_t num_ready = 0;
size_t num_connecting = 0;
size_t num_idle = 0;
size_t num_transient_failures = 0;
for (const auto& p : actions_) {
const auto& child_name = p.first;
const XdsRoutingChild* child = p.second.get();
// Skip the actions that are not in the latest update.
if (config_->action_map().find(child_name) == config_->action_map().end()) {
continue;
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
++num_ready;
break;
}
case GRPC_CHANNEL_CONNECTING: {
++num_connecting;
break;
}
case GRPC_CHANNEL_IDLE: {
++num_idle;
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
++num_transient_failures;
break;
}
default:
GPR_UNREACHABLE_CODE(return );
}
}
// Determine aggregated connectivity state.
grpc_connectivity_state connectivity_state;
if (num_ready > 0) {
connectivity_state = GRPC_CHANNEL_READY;
} else if (num_connecting > 0) {
connectivity_state = GRPC_CHANNEL_CONNECTING;
} else if (num_idle > 0) {
connectivity_state = GRPC_CHANNEL_IDLE;
} else {
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] connectivity changed to %s", this,
ConnectivityStateName(connectivity_state));
}
std::unique_ptr<SubchannelPicker> picker;
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
RoutePicker::RouteTable route_table;
for (const auto& config_route : config_->route_table()) {
RoutePicker::Route route;
route.matcher = config_route.matcher;
route.picker = actions_[config_route.action]->picker_wrapper();
if (route.picker == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] child %s has not yet returned a "
"picker; creating a QueuePicker.",
this, config_route.action.c_str());
}
route.picker = MakeRefCounted<ChildPickerWrapper>(
config_route.action, absl::make_unique<QueuePicker>(
Ref(DEBUG_LOCATION, "QueuePicker")));
}
route_table.push_back(std::move(route));
}
picker = absl::make_unique<RoutePicker>(std::move(route_table));
break;
}
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
picker =
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
picker = absl::make_unique<TransientFailurePicker>(grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TRANSIENT_FAILURE from XdsRoutingLb"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
channel_control_helper()->UpdateState(connectivity_state, std::move(picker));
}
//
// XdsRoutingLb::XdsRoutingChild
//
XdsRoutingLb::XdsRoutingChild::XdsRoutingChild(
RefCountedPtr<XdsRoutingLb> xds_routing_policy, const std::string& name)
: xds_routing_policy_(std::move(xds_routing_policy)), name_(name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO, "[xds_routing_lb %p] created XdsRoutingChild %p for %s",
xds_routing_policy_.get(), this, name_.c_str());
}
}
XdsRoutingLb::XdsRoutingChild::~XdsRoutingChild() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p: destroying child",
xds_routing_policy_.get(), this);
}
xds_routing_policy_.reset(DEBUG_LOCATION, "XdsRoutingChild");
}
void XdsRoutingLb::XdsRoutingChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p %s: shutting down child",
xds_routing_policy_.get(), this, name_.c_str());
}
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
xds_routing_policy_->interested_parties());
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_);
}
shutdown_ = true;
Unref();
}
OrphanablePtr<LoadBalancingPolicy>
XdsRoutingLb::XdsRoutingChild::CreateChildPolicyLocked(
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = xds_routing_policy_->combiner();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_xds_routing_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p %s: Created new child "
"policy handler %p",
xds_routing_policy_.get(), this, name_.c_str(), lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
xds_routing_policy_->interested_parties());
return lb_policy;
}
void XdsRoutingLb::XdsRoutingChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const ServerAddressList& addresses, const grpc_channel_args* args) {
if (xds_routing_policy_->shutting_down_) return;
// Update child weight.
// Reactivate if needed.
if (delayed_removal_timer_callback_pending_) {
delayed_removal_timer_callback_pending_ = false;
grpc_timer_cancel(&delayed_removal_timer_);
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
// Construct update args.
UpdateArgs update_args;
update_args.config = std::move(config);
update_args.addresses = addresses;
update_args.args = grpc_channel_args_copy(args);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] XdsRoutingChild %p %s: Updating child "
"policy handler %p",
xds_routing_policy_.get(), this, name_.c_str(),
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
void XdsRoutingLb::XdsRoutingChild::ExitIdleLocked() {
child_policy_->ExitIdleLocked();
}
void XdsRoutingLb::XdsRoutingChild::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
}
void XdsRoutingLb::XdsRoutingChild::DeactivateLocked() {
// If already deactivated, don't do that again.
if (delayed_removal_timer_callback_pending_ == true) return;
// Set the child weight to 0 so that future picker won't contain this child.
// Start a timer to delete the child.
Ref(DEBUG_LOCATION, "XdsRoutingChild+timer").release();
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(
&delayed_removal_timer_,
ExecCtx::Get()->Now() + GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS,
&on_delayed_removal_timer_);
delayed_removal_timer_callback_pending_ = true;
}
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimer(void* arg,
grpc_error* error) {
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg);
self->xds_routing_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error));
}
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked(
void* arg, grpc_error* error) {
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg);
self->delayed_removal_timer_callback_pending_ = false;
if (error == GRPC_ERROR_NONE && !self->shutdown_) {
self->xds_routing_policy_->actions_.erase(self->name_);
}
self->Unref(DEBUG_LOCATION, "XdsRoutingChild+timer");
}
//
// XdsRoutingLb::XdsRoutingChild::Helper
//
RefCountedPtr<SubchannelInterface>
XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr;
return xds_routing_child_->xds_routing_policy_->channel_control_helper()
->CreateSubchannel(args);
}
void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_routing_lb %p] child %s: received update: state=%s picker=%p",
xds_routing_child_->xds_routing_policy_.get(),
xds_routing_child_->name_.c_str(), ConnectivityStateName(state),
picker.get());
}
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
// Cache the picker in the XdsRoutingChild.
xds_routing_child_->picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(
xds_routing_child_->name_, std::move(picker));
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
// changes until we go back into state READY.
if (!xds_routing_child_->seen_failure_since_ready_) {
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
xds_routing_child_->seen_failure_since_ready_ = true;
}
} else {
if (state != GRPC_CHANNEL_READY) return;
xds_routing_child_->seen_failure_since_ready_ = false;
}
xds_routing_child_->connectivity_state_ = state;
// Notify the LB policy.
xds_routing_child_->xds_routing_policy_->UpdateStateLocked();
}
void XdsRoutingLb::XdsRoutingChild::Helper::RequestReresolution() {
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
xds_routing_child_->xds_routing_policy_->channel_control_helper()
->RequestReresolution();
}
void XdsRoutingLb::XdsRoutingChild::Helper::AddTraceEvent(
TraceSeverity severity, StringView message) {
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return;
xds_routing_child_->xds_routing_policy_->channel_control_helper()
->AddTraceEvent(severity, message);
}
//
// factory
//
class XdsRoutingLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<XdsRoutingLb>(std::move(args));
}
const char* name() const override { return kXdsRouting; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// xds_routing was mentioned as a policy in the deprecated
// loadBalancingPolicy field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:xds_routing policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
return nullptr;
}
std::vector<grpc_error*> error_list;
// action map.
XdsRoutingLbConfig::ActionMap action_map;
std::set<std::string /*action_name*/> actions_to_be_used;
auto it = json.object_value().find("actions");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error:required field not present"));
} else if (it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions error:type should be object"));
} else {
for (const auto& p : it->second.object_value()) {
if (p.first.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:actions element error: name cannot be empty"));
continue;
}
RefCountedPtr<LoadBalancingPolicy::Config> child_config;
std::vector<grpc_error*> child_errors =
ParseChildConfig(p.second, &child_config);
if (!child_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:actions name:", p.first).c_str());
for (grpc_error* child_error : child_errors) {
error = grpc_error_add_child(error, child_error);
}
error_list.push_back(error);
} else {
action_map[p.first] = std::move(child_config);
actions_to_be_used.insert(p.first);
}
}
}
if (action_map.empty()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid actions configured"));
}
XdsRoutingLbConfig::RouteTable route_table;
it = json.object_value().find("routes");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:required field not present"));
} else if (it->second.type() != Json::Type::ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:routes error:type should be array"));
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
XdsRoutingLbConfig::Route route;
std::vector<grpc_error*> route_errors =
ParseRoute(array[i], action_map, &route, &actions_to_be_used);
if (!route_errors.empty()) {
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("field:routes element: ", i, " error").c_str());
for (grpc_error* route_error : route_errors) {
error = grpc_error_add_child(error, route_error);
}
error_list.push_back(error);
}
route_table.emplace_back(std::move(route));
}
}
if (route_table.empty()) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid routes configured");
error_list.push_back(error);
}
if (!route_table.back().matcher.service.empty() ||
!route_table.back().matcher.method.empty()) {
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"default route must not contain service or method");
error_list.push_back(error);
}
if (!actions_to_be_used.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"some actions were not referenced by any route"));
}
if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"xds_routing_experimental LB policy config", &error_list);
return nullptr;
}
return MakeRefCounted<XdsRoutingLbConfig>(std::move(action_map),
std::move(route_table));
}
private:
static std::vector<grpc_error*> ParseChildConfig(
const Json& json,
RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
auto it = json.object_value().find("child_policy");
if (it == json.object_value().end()) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
} else {
grpc_error* parse_error = GRPC_ERROR_NONE;
*child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
it->second, &parse_error);
if (*child_config == nullptr) {
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
std::vector<grpc_error*> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
}
return error_list;
}
static std::vector<grpc_error*> ParseMethodName(
const Json& json, XdsRoutingLbConfig::Matcher* route_config) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
// Parse service
auto it = json.object_value().find("service");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:service error: should be string"));
} else {
route_config->service = it->second.string_value();
}
}
// Parse method
it = json.object_value().find("method");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:method error: should be string"));
} else {
route_config->method = it->second.string_value();
}
}
if (route_config->service.empty() && !route_config->method.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"service is empty when method is not"));
}
return error_list;
}
static std::vector<grpc_error*> ParseRoute(
const Json& json, const XdsRoutingLbConfig::ActionMap& action_map,
XdsRoutingLbConfig::Route* route,
std::set<std::string /*action_name*/>* actions_to_be_used) {
std::vector<grpc_error*> error_list;
if (json.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"value should be of type object"));
return error_list;
}
// Parse MethodName.
auto it = json.object_value().find("methodName");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:methodName error:required field missing"));
} else {
std::vector<grpc_error*> method_name_errors =
ParseMethodName(it->second, &route->matcher);
if (!method_name_errors.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
"field:methodName", &method_name_errors));
}
}
// Parse action.
it = json.object_value().find("action");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:action error:required field missing"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:action error:should be of type string"));
} else {
route->action = it->second.string_value();
if (route->action.empty()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:action error:cannot be empty"));
} else {
// Validate action exists and mark it as used.
if (action_map.find(route->action) == action_map.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
absl::StrCat("field:action error:", route->action,
" does not exist")
.c_str()));
}
actions_to_be_used->erase(route->action);
}
}
return error_list;
}
};
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
void grpc_lb_policy_xds_routing_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::XdsRoutingLbFactory>());
}
void grpc_lb_policy_xds_routing_shutdown() {}

@ -24,6 +24,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include <grpc/impl/codegen/log.h>
#include <grpc/support/alloc.h>
@ -951,7 +952,8 @@ MatchType DomainPatternMatchType(const std::string& domain_pattern) {
grpc_error* RouteConfigParse(
XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_RouteConfiguration* route_config,
const std::string& expected_server_name, XdsApi::RdsUpdate* rds_update) {
const std::string& expected_server_name, const bool xds_routing_enabled,
XdsApi::RdsUpdate* rds_update) {
MaybeLogRouteConfiguration(client, tracer, route_config);
// Get the virtual hosts.
size_t size;
@ -1011,41 +1013,105 @@ grpc_error* RouteConfigParse(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No route found in the virtual host.");
}
// Only look at the last one in the route list (the default route),
const envoy_api_v2_route_Route* route = routes[size - 1];
// Validate that the match field must have a prefix field which is an empty
// string.
const envoy_api_v2_route_RouteMatch* match =
envoy_api_v2_route_Route_match(route);
if (!envoy_api_v2_route_RouteMatch_has_prefix(match)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No prefix field found in RouteMatch.");
}
const upb_strview prefix = envoy_api_v2_route_RouteMatch_prefix(match);
if (!upb_strview_eql(prefix, upb_strview_makez("")) &&
!upb_strview_eql(prefix, upb_strview_makez("/"))) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Prefix is not \"\" or \"/\".");
}
if (!envoy_api_v2_route_Route_has_route(route)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No RouteAction found in route.");
// If xds_routing is not configured, only look at the last one in the route
// list (the default route)
size_t start_index = xds_routing_enabled ? 0 : size - 1;
for (size_t i = start_index; i < size; ++i) {
const envoy_api_v2_route_Route* route = routes[i];
const envoy_api_v2_route_RouteMatch* match =
envoy_api_v2_route_Route_match(route);
XdsApi::RdsRoute rds_route;
if (envoy_api_v2_route_RouteMatch_has_prefix(match)) {
upb_strview prefix = envoy_api_v2_route_RouteMatch_prefix(match);
// Empty prefix "" is accepted.
if (prefix.size > 0) {
// Prefix "/" is accepted.
if (prefix.data[0] != '/') {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Prefix does not start with a /");
}
if (prefix.size > 1) {
std::vector<absl::string_view> prefix_elements = absl::StrSplit(
absl::string_view(prefix.data, prefix.size).substr(1),
absl::MaxSplits('/', 1));
if (prefix_elements.size() != 2) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Prefix not in the required format of /service/");
} else if (!prefix_elements[1].empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Prefix does not end with a /");
} else if (prefix_elements[0].empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Prefix contains empty service name");
}
rds_route.service = std::string(prefix_elements[0]);
}
}
} else if (envoy_api_v2_route_RouteMatch_has_path(match)) {
upb_strview path = envoy_api_v2_route_RouteMatch_path(match);
if (path.size == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path if set cannot be empty");
}
if (path.data[0] != '/') {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path does not start with a /");
}
std::vector<absl::string_view> path_elements = absl::StrSplit(
absl::string_view(path.data, path.size).substr(1), '/');
if (path_elements.size() != 2) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path not in the required format of /service/method");
} else if (path_elements[0].empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path contains empty service name");
} else if (path_elements[1].empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Path contains empty method name");
}
rds_route.service = std::string(path_elements[0]);
rds_route.method = std::string(path_elements[1]);
} else {
// TODO(donnadionne): We may change this behavior once we decide how to
// handle unsupported fields.
continue;
}
if (!envoy_api_v2_route_Route_has_route(route)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No RouteAction found in route.");
}
const envoy_api_v2_route_RouteAction* route_action =
envoy_api_v2_route_Route_route(route);
// Get the cluster in the RouteAction.
if (!envoy_api_v2_route_RouteAction_has_cluster(route_action)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No cluster found in RouteAction.");
}
const upb_strview action =
envoy_api_v2_route_RouteAction_cluster(route_action);
if (action.size == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"RouteAction contains empty cluster.");
}
rds_route.cluster_name = std::string(action.data, action.size);
rds_update->routes.emplace_back(std::move(rds_route));
}
const envoy_api_v2_route_RouteAction* route_action =
envoy_api_v2_route_Route_route(route);
// Get the cluster in the RouteAction.
if (!envoy_api_v2_route_RouteAction_has_cluster(route_action)) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No cluster found in RouteAction.");
if (rds_update->routes.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid routes specified.");
} else {
if (!rds_update->routes.back().service.empty() ||
!rds_update->routes.back().method.empty()) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Default route must have empty service and method");
}
}
const upb_strview cluster =
envoy_api_v2_route_RouteAction_cluster(route_action);
rds_update->cluster_name = std::string(cluster.data, cluster.size);
return GRPC_ERROR_NONE;
}
grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const bool xds_routing_enabled,
absl::optional<XdsApi::LdsUpdate>* lds_update,
upb_arena* arena) {
// Get the resources from the response.
@ -1091,8 +1157,9 @@ grpc_error* LdsResponseParse(XdsClient* client, TraceFlag* tracer,
envoy_config_filter_network_http_connection_manager_v2_HttpConnectionManager_route_config(
http_connection_manager);
XdsApi::RdsUpdate rds_update;
grpc_error* error = RouteConfigParse(client, tracer, route_config,
expected_server_name, &rds_update);
grpc_error* error =
RouteConfigParse(client, tracer, route_config, expected_server_name,
xds_routing_enabled, &rds_update);
if (error != GRPC_ERROR_NONE) return error;
lds_update->emplace();
(*lds_update)->rds_update.emplace(std::move(rds_update));
@ -1123,6 +1190,7 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
const envoy_api_v2_DiscoveryResponse* response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
absl::optional<XdsApi::RdsUpdate>* rds_update,
upb_arena* arena) {
// Get the resources from the response.
@ -1151,8 +1219,9 @@ grpc_error* RdsResponseParse(XdsClient* client, TraceFlag* tracer,
if (!upb_strview_eql(name, expected_name)) continue;
// Parse the route_config.
XdsApi::RdsUpdate local_rds_update;
grpc_error* error = RouteConfigParse(
client, tracer, route_config, expected_server_name, &local_rds_update);
grpc_error* error =
RouteConfigParse(client, tracer, route_config, expected_server_name,
xds_routing_enabled, &local_rds_update);
if (error != GRPC_ERROR_NONE) return error;
rds_update->emplace(std::move(local_rds_update));
return GRPC_ERROR_NONE;
@ -1432,6 +1501,7 @@ grpc_error* EdsResponseParse(
grpc_error* XdsApi::ParseAdsResponse(
const grpc_slice& encoded_response, const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
const std::set<StringView>& expected_cluster_names,
const std::set<StringView>& expected_eds_service_names,
absl::optional<LdsUpdate>* lds_update,
@ -1463,11 +1533,11 @@ grpc_error* XdsApi::ParseAdsResponse(
// Parse the response according to the resource type.
if (*type_url == kLdsTypeUrl) {
return LdsResponseParse(client_, tracer_, response, expected_server_name,
lds_update, arena.ptr());
xds_routing_enabled, lds_update, arena.ptr());
} else if (*type_url == kRdsTypeUrl) {
return RdsResponseParse(client_, tracer_, response, expected_server_name,
expected_route_config_name, rds_update,
arena.ptr());
expected_route_config_name, xds_routing_enabled,
rds_update, arena.ptr());
} else if (*type_url == kCdsTypeUrl) {
return CdsResponseParse(client_, tracer_, response, expected_cluster_names,
cds_update_map, arena.ptr());

@ -44,12 +44,22 @@ class XdsApi {
static const char* kCdsTypeUrl;
static const char* kEdsTypeUrl;
struct RdsUpdate {
// The name to use in the CDS request.
struct RdsRoute {
std::string service;
std::string method;
std::string cluster_name;
bool operator==(const RdsRoute& other) const {
return (service == other.service && method == other.method &&
cluster_name == other.cluster_name);
}
};
struct RdsUpdate {
std::vector<RdsRoute> routes;
bool operator==(const RdsUpdate& other) const {
return cluster_name == other.cluster_name;
return routes == other.routes;
}
};
@ -247,6 +257,7 @@ class XdsApi {
const grpc_slice& encoded_response,
const std::string& expected_server_name,
const std::string& expected_route_config_name,
const bool xds_routing_enabled,
const std::set<StringView>& expected_cluster_names,
const std::set<StringView>& expected_eds_service_names,
absl::optional<LdsUpdate>* lds_update,

@ -22,6 +22,7 @@
#include <limits.h>
#include <string.h>
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include <grpc/byte_buffer_reader.h>
@ -896,15 +897,22 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received: route_config_name=%s, "
"cluster_name=%s",
"[xds_client %p] LDS update received: route_config_name=%s",
xds_client(),
(!lds_update->route_config_name.empty()
? lds_update->route_config_name.c_str()
: "<inlined>"),
(lds_update->rds_update.has_value()
? lds_update->rds_update->cluster_name.c_str()
: "<to be obtained via RDS>"));
: "<inlined>"));
if (lds_update->rds_update.has_value()) {
gpr_log(GPR_INFO, " RouteConfiguration contains %lu routes",
lds_update->rds_update.value().routes.size());
for (const auto& route : lds_update->rds_update.value().routes) {
gpr_log(GPR_INFO,
" route: { service=\"%s\", "
"method=\"%s\" }, cluster=\"%s\" }",
route.service.c_str(), route.method.c_str(),
route.cluster_name.c_str());
}
}
}
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
@ -930,7 +938,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
// the watcher immediately.
RefCountedPtr<ServiceConfig> service_config;
grpc_error* error = xds_client()->CreateServiceConfig(
xds_client()->lds_result_->rds_update->cluster_name, &service_config);
xds_client()->lds_result_->rds_update.value(), &service_config);
if (error == GRPC_ERROR_NONE) {
xds_client()->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
@ -956,8 +964,17 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] RDS update received: cluster_name=%s",
xds_client(), rds_update->cluster_name.c_str());
gpr_log(GPR_INFO,
"[xds_client %p] RDS update received; RouteConfiguration contains "
"%lu routes",
this, rds_update.value().routes.size());
for (const auto& route : rds_update.value().routes) {
gpr_log(GPR_INFO,
" route: { service=\"%s\", "
"method=\"%s\" }, cluster=\"%s\" }",
route.service.c_str(), route.method.c_str(),
route.cluster_name.c_str());
}
}
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
auto& state =
@ -977,7 +994,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
// Notify the watcher.
RefCountedPtr<ServiceConfig> service_config;
grpc_error* error = xds_client()->CreateServiceConfig(
xds_client()->rds_result_->cluster_name, &service_config);
xds_client()->rds_result_.value(), &service_config);
if (error == GRPC_ERROR_NONE) {
xds_client()->service_config_watcher_->OnServiceConfigChanged(
std::move(service_config));
@ -1226,7 +1243,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
(xds_client->lds_result_.has_value()
? xds_client->lds_result_->route_config_name
: ""),
ads_calld->ClusterNamesForRequest(),
xds_client->xds_routing_enabled_, ads_calld->ClusterNamesForRequest(),
ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update,
&cds_update_map, &eds_update_map, &version, &nonce, &type_url);
grpc_slice_unref_internal(response_slice);
@ -1801,6 +1818,11 @@ grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
{15000, 0, INT_MAX});
}
bool GetXdsRoutingEnabled(const grpc_channel_args& args) {
return grpc_channel_args_find_bool(&args, GRPC_ARG_XDS_ROUTING_ENABLED,
false);
}
} // namespace
XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
@ -1809,6 +1831,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
const grpc_channel_args& channel_args, grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout(channel_args)),
xds_routing_enabled_(GetXdsRoutingEnabled(channel_args)),
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
interested_parties_(interested_parties),
bootstrap_(
@ -2034,22 +2057,69 @@ void XdsClient::ResetBackoff() {
}
}
namespace {
std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
return absl::StrFormat(
" \"cds:%s\":{\n"
" \"child_policy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster_name.c_str(), cluster_name.c_str());
}
std::string CreateServiceConfigRoute(const std::string& cluster_name,
const std::string& service,
const std::string& method) {
return absl::StrFormat(
" { \n"
" \"methodName\": {\n"
" \"service\": \"%s\",\n"
" \"method\": \"%s\"\n"
" },\n"
" \"action\": \"cds:%s\"\n"
" }",
service.c_str(), method.c_str(), cluster_name.c_str());
}
} // namespace
grpc_error* XdsClient::CreateServiceConfig(
const std::string& cluster_name,
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) const {
char* json;
gpr_asprintf(&json,
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" } }\n"
" ]\n"
"}",
cluster_name.c_str());
std::vector<std::string> config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_routing_experimental\":{\n"
" \"actions\":{\n");
std::vector<std::string> actions_vector;
for (size_t i = 0; i < rds_update.routes.size(); ++i) {
auto route = rds_update.routes[i];
actions_vector.push_back(
CreateServiceConfigActionCluster(route.cluster_name.c_str()));
}
config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
config_parts.push_back(
" },\n"
" \"routes\":[\n");
std::vector<std::string> routes_vector;
for (size_t i = 0; i < rds_update.routes.size(); ++i) {
auto route_info = rds_update.routes[i];
routes_vector.push_back(CreateServiceConfigRoute(
route_info.cluster_name.c_str(), route_info.service.c_str(),
route_info.method.c_str()));
}
config_parts.push_back(absl::StrJoin(routes_vector, ",\n"));
config_parts.push_back(
" ]\n"
" } }\n"
" ]\n"
"}");
std::string json = absl::StrJoin(config_parts, "");
grpc_error* error = GRPC_ERROR_NONE;
*service_config = ServiceConfig::Create(json, &error);
gpr_free(json);
*service_config = ServiceConfig::Create(json.c_str(), &error);
return error;
}

@ -226,7 +226,7 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
void NotifyOnError(grpc_error* error);
grpc_error* CreateServiceConfig(
const std::string& cluster_name,
const XdsApi::RdsUpdate& rds_update,
RefCountedPtr<ServiceConfig>* service_config) const;
XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
@ -241,6 +241,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
const grpc_millis request_timeout_;
const bool xds_routing_enabled_;
Combiner* combiner_;
grpc_pollset_set* interested_parties_;

@ -44,6 +44,8 @@ void grpc_lb_policy_priority_init(void);
void grpc_lb_policy_priority_shutdown(void);
void grpc_lb_policy_weighted_target_init(void);
void grpc_lb_policy_weighted_target_shutdown(void);
void grpc_lb_policy_xds_routing_init(void);
void grpc_lb_policy_xds_routing_shutdown(void);
void grpc_lb_policy_pick_first_init(void);
void grpc_lb_policy_pick_first_shutdown(void);
void grpc_lb_policy_round_robin_init(void);
@ -92,6 +94,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_priority_shutdown);
grpc_register_plugin(grpc_lb_policy_weighted_target_init,
grpc_lb_policy_weighted_target_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_routing_init,
grpc_lb_policy_xds_routing_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,

@ -52,6 +52,8 @@ void grpc_lb_policy_priority_init(void);
void grpc_lb_policy_priority_shutdown(void);
void grpc_lb_policy_weighted_target_init(void);
void grpc_lb_policy_weighted_target_shutdown(void);
void grpc_lb_policy_xds_routing_init(void);
void grpc_lb_policy_xds_routing_shutdown(void);
void grpc_lb_policy_pick_first_init(void);
void grpc_lb_policy_pick_first_shutdown(void);
void grpc_lb_policy_round_robin_init(void);
@ -100,6 +102,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_priority_shutdown);
grpc_register_plugin(grpc_lb_policy_weighted_target_init,
grpc_lb_policy_weighted_target_shutdown);
grpc_register_plugin(grpc_lb_policy_xds_routing_init,
grpc_lb_policy_xds_routing_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,

@ -22,6 +22,34 @@ package grpc.testing;
service EchoTestService {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc Echo1(EchoRequest) returns (EchoResponse);
rpc Echo2(EchoRequest) returns (EchoResponse);
// A service which checks that the initial metadata sent over contains some
// expected key value pair
rpc CheckClientInitialMetadata(SimpleRequest) returns (SimpleResponse);
rpc RequestStream(stream EchoRequest) returns (EchoResponse);
rpc ResponseStream(EchoRequest) returns (stream EchoResponse);
rpc BidiStream(stream EchoRequest) returns (stream EchoResponse);
rpc Unimplemented(EchoRequest) returns (EchoResponse);
}
service EchoTest1Service {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc Echo1(EchoRequest) returns (EchoResponse);
rpc Echo2(EchoRequest) returns (EchoResponse);
// A service which checks that the initial metadata sent over contains some
// expected key value pair
rpc CheckClientInitialMetadata(SimpleRequest) returns (SimpleResponse);
rpc RequestStream(stream EchoRequest) returns (EchoResponse);
rpc ResponseStream(EchoRequest) returns (stream EchoResponse);
rpc BidiStream(stream EchoRequest) returns (stream EchoResponse);
rpc Unimplemented(EchoRequest) returns (EchoResponse);
}
service EchoTest2Service {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc Echo1(EchoRequest) returns (EchoResponse);
rpc Echo2(EchoRequest) returns (EchoResponse);
// A service which checks that the initial metadata sent over contains some
// expected key value pair
rpc CheckClientInitialMetadata(SimpleRequest) returns (SimpleResponse);

@ -34,6 +34,7 @@ message RouteMatch {
// If specified, the route is a prefix rule meaning that the prefix must
// match the beginning of the *:path* header.
string prefix = 1;
string path = 2;
}
}

@ -43,6 +43,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/lb_policy/xds/cds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/eds.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc',
'src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
'src/core/ext/filters/client_channel/local_subchannel_pool.cc',
'src/core/ext/filters/client_channel/parse_address.cc',

@ -34,7 +34,7 @@ using std::chrono::system_clock;
namespace grpc {
namespace testing {
namespace {
namespace internal {
// When echo_deadline is requested, deadline seen in the ServerContext is set in
// the response in seconds.
@ -84,9 +84,7 @@ int MetadataMatchCount(
}
return count;
}
} // namespace
namespace {
int GetIntValueFromMetadataHelper(
const char* key,
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
@ -125,293 +123,7 @@ void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) {
"Server called TryCancelNonblocking() to cancel the request");
}
} // namespace
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
if (request->has_param() &&
request->param().server_notify_client_when_started()) {
signaller_.SignalClientThatRpcStarted();
signaller_.ServerWaitToContinue();
}
// A bit of sleep to make sure that short deadline tests fail
if (request->has_param() && request->param().server_sleep_us() > 0) {
gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(request->param().server_sleep_us(),
GPR_TIMESPAN)));
}
if (request->has_param() && request->param().server_die()) {
gpr_log(GPR_ERROR, "The request should not reach application handler.");
GPR_ASSERT(0);
}
if (request->has_param() && request->param().has_expected_error()) {
const auto& error = request->param().expected_error();
return Status(static_cast<StatusCode>(error.code()), error.error_message(),
error.binary_error_details());
}
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel > DO_NOT_CANCEL) {
// Since this is a unary RPC, by the time this server handler is called,
// the 'request' message is already read from the client. So the scenarios
// in server_try_cancel don't make much sense. Just cancel the RPC as long
// as server_try_cancel is not DO_NOT_CANCEL
ServerTryCancel(context);
return Status::CANCELLED;
}
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {
response->mutable_param()->set_host(*host_);
}
if (request->has_param() && request->param().client_cancel_after_us()) {
{
std::unique_lock<std::mutex> lock(mu_);
signal_client_ = true;
}
while (!context->IsCancelled()) {
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(request->param().client_cancel_after_us(),
GPR_TIMESPAN)));
}
return Status::CANCELLED;
} else if (request->has_param() &&
request->param().server_cancel_after_us()) {
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)));
return Status::CANCELLED;
} else if (!request->has_param() ||
!request->param().skip_cancelled_check()) {
EXPECT_FALSE(context->IsCancelled());
}
if (request->has_param() && request->param().echo_metadata_initially()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
for (const auto& metadatum : client_metadata) {
context->AddInitialMetadata(ToString(metadatum.first),
ToString(metadatum.second));
}
}
if (request->has_param() && request->param().echo_metadata()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
for (const auto& metadatum : client_metadata) {
context->AddTrailingMetadata(ToString(metadatum.first),
ToString(metadatum.second));
}
// Terminate rpc with error and debug info in trailer.
if (request->param().debug_info().stack_entries_size() ||
!request->param().debug_info().detail().empty()) {
grpc::string serialized_debug_info =
request->param().debug_info().SerializeAsString();
context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
return Status::CANCELLED;
}
}
if (request->has_param() &&
(request->param().expected_client_identity().length() > 0 ||
request->param().check_auth_context())) {
CheckServerAuthContext(context,
request->param().expected_transport_security_type(),
request->param().expected_client_identity());
}
if (request->has_param() && request->param().response_message_length() > 0) {
response->set_message(
grpc::string(request->param().response_message_length(), '\0'));
}
if (request->has_param() && request->param().echo_peer()) {
response->mutable_param()->set_peer(context->peer());
}
return Status::OK;
}
Status TestServiceImpl::CheckClientInitialMetadata(
ServerContext* context, const SimpleRequest* /*request*/,
SimpleResponse* /*response*/) {
EXPECT_EQ(MetadataMatchCount(context->client_metadata(),
kCheckClientInitialMetadataKey,
kCheckClientInitialMetadataVal),
1);
EXPECT_EQ(1u,
context->client_metadata().count(kCheckClientInitialMetadataKey));
return Status::OK;
}
// Unimplemented is left unimplemented to test the returned error.
Status TestServiceImpl::RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) {
// If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
// the server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
// any message from the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// reading messages from the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
// all the messages from the client
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
EchoRequest request;
response->set_message("");
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread([context] { ServerTryCancel(context); });
}
int num_msgs_read = 0;
while (reader->Read(&request)) {
response->mutable_message()->append(request.message());
}
gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
// Return 'kNumResponseStreamMsgs' messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status TestServiceImpl::ResponseStream(ServerContext* context,
const EchoRequest* request,
ServerWriter<EchoResponse>* writer) {
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
// server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
// any messages to the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// writing messages to the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
// all the messages to the client
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
int server_coalescing_api = GetIntValueFromMetadata(
kServerUseCoalescingApi, context->client_metadata(), 0);
int server_responses_to_send = GetIntValueFromMetadata(
kServerResponseStreamsToSend, context->client_metadata(),
kServerDefaultResponseStreamsToSend);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
EchoResponse response;
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread([context] { ServerTryCancel(context); });
}
for (int i = 0; i < server_responses_to_send; i++) {
response.set_message(request->message() + grpc::to_string(i));
if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
writer->WriteLast(response, WriteOptions());
} else {
writer->Write(response);
}
}
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
Status TestServiceImpl::BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
// server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
// writes any messages from/to the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// reading/writing messages from/to the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
// reads/writes all messages from/to the client
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
EchoRequest request;
EchoResponse response;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread([context] { ServerTryCancel(context); });
}
// kServerFinishAfterNReads suggests after how many reads, the server should
// write the last message and send status (coalesced using WriteLast)
int server_write_last = GetIntValueFromMetadata(
kServerFinishAfterNReads, context->client_metadata(), 0);
int read_counts = 0;
while (stream->Read(&request)) {
read_counts++;
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
if (read_counts == server_write_last) {
stream->WriteLast(response, WriteOptions());
} else {
stream->Write(response);
}
}
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
} // namespace internal
experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
experimental::CallbackServerContext* context, const EchoRequest* request,
@ -500,7 +212,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
error.error_message(), error.binary_error_details()));
return;
}
int server_try_cancel = GetIntValueFromMetadata(
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, ctx_->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel != DO_NOT_CANCEL) {
// Since this is a unary RPC, by the time this server handler is called,
@ -515,7 +227,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
}
gpr_log(GPR_DEBUG, "Request message was %s", req_->message().c_str());
resp_->set_message(req_->message());
MaybeEchoDeadline(ctx_, req_, resp_);
internal::MaybeEchoDeadline(ctx_, req_, resp_);
if (service_->host_) {
resp_->mutable_param()->set_host(*service_->host_);
}
@ -569,9 +281,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
if (req_->has_param() &&
(req_->param().expected_client_identity().length() > 0 ||
req_->param().check_auth_context())) {
CheckServerAuthContext(ctx_,
req_->param().expected_transport_security_type(),
req_->param().expected_client_identity());
internal::CheckServerAuthContext(
ctx_, req_->param().expected_transport_security_type(),
req_->param().expected_client_identity());
}
if (req_->has_param() && req_->param().response_message_length() > 0) {
resp_->set_message(
@ -615,9 +327,9 @@ CallbackTestServiceImpl::CheckClientInitialMetadata(
class Reactor : public ::grpc::experimental::ServerUnaryReactor {
public:
explicit Reactor(experimental::CallbackServerContext* ctx) {
EXPECT_EQ(MetadataMatchCount(ctx->client_metadata(),
kCheckClientInitialMetadataKey,
kCheckClientInitialMetadataVal),
EXPECT_EQ(internal::MetadataMatchCount(ctx->client_metadata(),
kCheckClientInitialMetadataKey,
kCheckClientInitialMetadataVal),
1);
EXPECT_EQ(ctx->client_metadata().count(kCheckClientInitialMetadataKey),
1u);
@ -640,10 +352,10 @@ CallbackTestServiceImpl::RequestStream(
// is cancelled while the server is reading messages from the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
// all the messages from the client
int server_try_cancel = GetIntValueFromMetadata(
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancelNonblocking(context);
internal::ServerTryCancelNonblocking(context);
// Don't need to provide a reactor since the RPC is canceled
return nullptr;
}
@ -684,7 +396,7 @@ CallbackTestServiceImpl::RequestStream(
return;
}
if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
ServerTryCancelNonblocking(ctx_);
internal::ServerTryCancelNonblocking(ctx_);
return;
}
FinishOnce(Status::OK);
@ -726,10 +438,10 @@ CallbackTestServiceImpl::ResponseStream(
// is cancelled while the server is reading messages from the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
// all the messages from the client
int server_try_cancel = GetIntValueFromMetadata(
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancelNonblocking(context);
internal::ServerTryCancelNonblocking(context);
}
class Reactor
@ -738,9 +450,9 @@ CallbackTestServiceImpl::ResponseStream(
Reactor(experimental::CallbackServerContext* ctx,
const EchoRequest* request, int server_try_cancel)
: ctx_(ctx), request_(request), server_try_cancel_(server_try_cancel) {
server_coalescing_api_ = GetIntValueFromMetadata(
server_coalescing_api_ = internal::GetIntValueFromMetadata(
kServerUseCoalescingApi, ctx->client_metadata(), 0);
server_responses_to_send_ = GetIntValueFromMetadata(
server_responses_to_send_ = internal::GetIntValueFromMetadata(
kServerResponseStreamsToSend, ctx->client_metadata(),
kServerDefaultResponseStreamsToSend);
if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
@ -767,7 +479,7 @@ CallbackTestServiceImpl::ResponseStream(
} else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
// Let OnCancel recover this
} else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
ServerTryCancelNonblocking(ctx_);
internal::ServerTryCancelNonblocking(ctx_);
} else {
FinishOnce(Status::OK);
}
@ -825,12 +537,12 @@ CallbackTestServiceImpl::BidiStream(
// is cancelled while the server is reading messages from the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
// all the messages from the client
server_try_cancel_ = GetIntValueFromMetadata(
server_try_cancel_ = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, ctx->client_metadata(), DO_NOT_CANCEL);
server_write_last_ = GetIntValueFromMetadata(kServerFinishAfterNReads,
ctx->client_metadata(), 0);
server_write_last_ = internal::GetIntValueFromMetadata(
kServerFinishAfterNReads, ctx->client_metadata(), 0);
if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
ServerTryCancelNonblocking(ctx);
internal::ServerTryCancelNonblocking(ctx);
} else {
if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
ctx->TryCancel();
@ -870,7 +582,7 @@ CallbackTestServiceImpl::BidiStream(
if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
// Let OnCancel handle this
} else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
ServerTryCancelNonblocking(ctx_);
internal::ServerTryCancelNonblocking(ctx_);
} else {
FinishOnce(Status::OK);
}

@ -15,6 +15,7 @@
* limitations under the License.
*
*/
#ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
#define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
@ -23,9 +24,19 @@
#include <mutex>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpcpp/alarm.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h>
#include <gtest/gtest.h>
#include <string>
#include <thread>
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/cpp/util/string_ref_helper.h"
using std::chrono::system_clock;
namespace grpc {
namespace testing {
@ -46,6 +57,36 @@ typedef enum {
CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase;
namespace internal {
// When echo_deadline is requested, deadline seen in the ServerContext is set in
// the response in seconds.
void MaybeEchoDeadline(experimental::ServerContextBase* context,
const EchoRequest* request, EchoResponse* response);
void CheckServerAuthContext(
const experimental::ServerContextBase* context,
const grpc::string& expected_transport_security_type,
const grpc::string& expected_client_identity);
// Returns the number of pairs in metadata that exactly match the given
// key-value pair. Returns -1 if the pair wasn't found.
int MetadataMatchCount(
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
const grpc::string& key, const grpc::string& value);
int GetIntValueFromMetadataHelper(
const char* key,
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
int default_value);
int GetIntValueFromMetadata(
const char* key,
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
int default_value);
void ServerTryCancel(ServerContext* context);
} // namespace internal
class TestServiceSignaller {
public:
void ClientWaitUntilRpcStarted() {
@ -75,32 +116,310 @@ class TestServiceSignaller {
bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
};
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
template <typename RpcService>
class TestMultipleServiceImpl : public RpcService {
public:
TestServiceImpl() : signal_client_(false), host_() {}
explicit TestServiceImpl(const grpc::string& host)
TestMultipleServiceImpl() : signal_client_(false), host_() {}
explicit TestMultipleServiceImpl(const grpc::string& host)
: signal_client_(false), host_(new grpc::string(host)) {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override;
EchoResponse* response) {
if (request->has_param() &&
request->param().server_notify_client_when_started()) {
signaller_.SignalClientThatRpcStarted();
signaller_.ServerWaitToContinue();
}
// A bit of sleep to make sure that short deadline tests fail
if (request->has_param() && request->param().server_sleep_us() > 0) {
gpr_sleep_until(
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(request->param().server_sleep_us(),
GPR_TIMESPAN)));
}
if (request->has_param() && request->param().server_die()) {
gpr_log(GPR_ERROR, "The request should not reach application handler.");
GPR_ASSERT(0);
}
if (request->has_param() && request->param().has_expected_error()) {
const auto& error = request->param().expected_error();
return Status(static_cast<StatusCode>(error.code()),
error.error_message(), error.binary_error_details());
}
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel > DO_NOT_CANCEL) {
// Since this is a unary RPC, by the time this server handler is called,
// the 'request' message is already read from the client. So the scenarios
// in server_try_cancel don't make much sense. Just cancel the RPC as long
// as server_try_cancel is not DO_NOT_CANCEL
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
response->set_message(request->message());
internal::MaybeEchoDeadline(context, request, response);
if (host_) {
response->mutable_param()->set_host(*host_);
}
if (request->has_param() && request->param().client_cancel_after_us()) {
{
std::unique_lock<std::mutex> lock(mu_);
signal_client_ = true;
}
while (!context->IsCancelled()) {
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(request->param().client_cancel_after_us(),
GPR_TIMESPAN)));
}
return Status::CANCELLED;
} else if (request->has_param() &&
request->param().server_cancel_after_us()) {
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)));
return Status::CANCELLED;
} else if (!request->has_param() ||
!request->param().skip_cancelled_check()) {
EXPECT_FALSE(context->IsCancelled());
}
if (request->has_param() && request->param().echo_metadata_initially()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
for (const auto& metadatum : client_metadata) {
context->AddInitialMetadata(ToString(metadatum.first),
ToString(metadatum.second));
}
}
if (request->has_param() && request->param().echo_metadata()) {
const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
context->client_metadata();
for (const auto& metadatum : client_metadata) {
context->AddTrailingMetadata(ToString(metadatum.first),
ToString(metadatum.second));
}
// Terminate rpc with error and debug info in trailer.
if (request->param().debug_info().stack_entries_size() ||
!request->param().debug_info().detail().empty()) {
grpc::string serialized_debug_info =
request->param().debug_info().SerializeAsString();
context->AddTrailingMetadata(kDebugInfoTrailerKey,
serialized_debug_info);
return Status::CANCELLED;
}
}
if (request->has_param() &&
(request->param().expected_client_identity().length() > 0 ||
request->param().check_auth_context())) {
internal::CheckServerAuthContext(
context, request->param().expected_transport_security_type(),
request->param().expected_client_identity());
}
if (request->has_param() &&
request->param().response_message_length() > 0) {
response->set_message(
grpc::string(request->param().response_message_length(), '\0'));
}
if (request->has_param() && request->param().echo_peer()) {
response->mutable_param()->set_peer(context->peer());
}
return Status::OK;
}
Status Echo1(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
return Echo(context, request, response);
}
Status Echo2(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
return Echo(context, request, response);
}
Status CheckClientInitialMetadata(ServerContext* context,
const SimpleRequest* request,
SimpleResponse* response) override;
const SimpleRequest* /*request*/,
SimpleResponse* /*response*/) {
EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
kCheckClientInitialMetadataKey,
kCheckClientInitialMetadataVal),
1);
EXPECT_EQ(1u,
context->client_metadata().count(kCheckClientInitialMetadataKey));
return Status::OK;
}
// Unimplemented is left unimplemented to test the returned error.
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) override;
EchoResponse* response) {
// If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
// the server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
// any message from the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// reading messages from the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
// all the messages from the client
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
EchoRequest request;
response->set_message("");
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread([context] { internal::ServerTryCancel(context); });
}
int num_msgs_read = 0;
while (reader->Read(&request)) {
response->mutable_message()->append(request.message());
}
gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
// Return 'kNumResponseStreamMsgs' messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(ServerContext* context, const EchoRequest* request,
ServerWriter<EchoResponse>* writer) override;
ServerWriter<EchoResponse>* writer) {
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
// server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
// any messages to the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// writing messages to the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
// all the messages to the client
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
int server_coalescing_api = internal::GetIntValueFromMetadata(
kServerUseCoalescingApi, context->client_metadata(), 0);
int server_responses_to_send = internal::GetIntValueFromMetadata(
kServerResponseStreamsToSend, context->client_metadata(),
kServerDefaultResponseStreamsToSend);
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
EchoResponse response;
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread([context] { internal::ServerTryCancel(context); });
}
for (int i = 0; i < server_responses_to_send; i++) {
response.set_message(request->message() + grpc::to_string(i));
if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
writer->WriteLast(response, WriteOptions());
} else {
writer->Write(response);
}
}
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
Status BidiStream(ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
// If server_try_cancel is set in the metadata, the RPC is cancelled by the
// server by calling ServerContext::TryCancel() depending on the value:
// CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
// writes any messages from/to the client
// CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
// reading/writing messages from/to the client
// CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
// reads/writes all messages from/to the client
int server_try_cancel = internal::GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
EchoRequest request;
EchoResponse response;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
new std::thread([context] { internal::ServerTryCancel(context); });
}
// kServerFinishAfterNReads suggests after how many reads, the server should
// write the last message and send status (coalesced using WriteLast)
int server_write_last = internal::GetIntValueFromMetadata(
kServerFinishAfterNReads, context->client_metadata(), 0);
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override;
int read_counts = 0;
while (stream->Read(&request)) {
read_counts++;
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
if (read_counts == server_write_last) {
stream->WriteLast(response, WriteOptions());
} else {
stream->Write(response);
}
}
if (server_try_cancel_thd != nullptr) {
server_try_cancel_thd->join();
delete server_try_cancel_thd;
return Status::CANCELLED;
}
if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
internal::ServerTryCancel(context);
return Status::CANCELLED;
}
return Status::OK;
}
// Unimplemented is left unimplemented to test the returned error.
bool signal_client() {
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
@ -156,6 +475,9 @@ class CallbackTestServiceImpl
std::unique_ptr<grpc::string> host_;
};
using TestServiceImpl =
TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>;
} // namespace testing
} // namespace grpc

@ -233,13 +233,14 @@ class CountedService : public ServiceType {
size_t response_count_ = 0;
};
using BackendService = CountedService<TestServiceImpl>;
using LrsService = CountedService<LoadReportingService::Service>;
const char g_kCallCredsMdKey[] = "Balancer should not ...";
const char g_kCallCredsMdValue[] = "... receive me";
class BackendServiceImpl : public BackendService {
template <typename RpcService>
class BackendServiceImpl
: public CountedService<TestMultipleServiceImpl<RpcService>> {
public:
BackendServiceImpl() {}
@ -252,13 +253,25 @@ class BackendServiceImpl : public BackendService {
if (call_credentials_entry != context->client_metadata().end()) {
EXPECT_EQ(call_credentials_entry->second, g_kCallCredsMdValue);
}
IncreaseRequestCount();
const auto status = TestServiceImpl::Echo(context, request, response);
IncreaseResponseCount();
CountedService<TestMultipleServiceImpl<RpcService>>::IncreaseRequestCount();
const auto status =
TestMultipleServiceImpl<RpcService>::Echo(context, request, response);
CountedService<
TestMultipleServiceImpl<RpcService>>::IncreaseResponseCount();
AddClient(context->peer());
return status;
}
Status Echo1(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
return Echo(context, request, response);
}
Status Echo2(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
return Echo(context, request, response);
}
void Start() {}
void Shutdown() {}
@ -1146,7 +1159,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void ResetStub(int failover_timeout = 0,
const grpc::string& expected_targets = "",
int xds_resource_does_not_exist_timeout = 0) {
int xds_resource_does_not_exist_timeout = 0,
bool xds_routing_enabled = false) {
ChannelArguments args;
if (failover_timeout > 0) {
args.SetInt(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, failover_timeout);
@ -1155,6 +1169,9 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
args.SetInt(GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
xds_resource_does_not_exist_timeout);
}
if (xds_routing_enabled) {
args.SetInt(GRPC_ARG_XDS_ROUTING_ENABLED, 1);
}
// If the parent channel is using the fake resolver, we inject the
// response generator for the parent here, and then SetNextResolution()
// will inject the xds channel's response generator via the parent's
@ -1187,6 +1204,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
channel_creds->Unref();
channel_ = ::grpc::CreateCustomChannel(uri.str(), creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
stub1_ = grpc::testing::EchoTest1Service::NewStub(channel_);
stub2_ = grpc::testing::EchoTest2Service::NewStub(channel_);
}
void ResetBackendCounters() {
@ -1340,29 +1359,105 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return backend_ports;
}
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
bool wait_for_ready = false, bool server_fail = false) {
enum RpcService {
SERVICE_ECHO,
SERVICE_ECHO1,
SERVICE_ECHO2,
};
enum RpcMethod {
METHOD_ECHO,
METHOD_ECHO1,
METHOD_ECHO2,
};
struct RpcOptions {
RpcService service = SERVICE_ECHO;
RpcMethod method = METHOD_ECHO;
int timeout_ms = 1000;
bool wait_for_ready = false;
bool server_fail = false;
RpcOptions() {}
RpcOptions& set_rpc_service(RpcService rpc_service) {
service = rpc_service;
return *this;
}
RpcOptions& set_rpc_method(RpcMethod rpc_method) {
method = rpc_method;
return *this;
}
RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
timeout_ms = rpc_timeout_ms;
return *this;
}
RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
wait_for_ready = rpc_wait_for_ready;
return *this;
}
RpcOptions& set_server_fail(bool rpc_server_fail) {
server_fail = rpc_server_fail;
return *this;
}
};
template <typename Stub>
Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options,
ClientContext* context, EchoRequest& request,
EchoResponse* response) {
switch (rpc_options.method) {
case METHOD_ECHO:
return (*stub)->Echo(context, request, response);
case METHOD_ECHO1:
return (*stub)->Echo1(context, request, response);
case METHOD_ECHO2:
return (*stub)->Echo2(context, request, response);
}
}
Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
EchoResponse* response = nullptr) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
ClientContext context;
context.set_deadline(
grpc_timeout_milliseconds_to_deadline(rpc_options.timeout_ms));
if (rpc_options.wait_for_ready) context.set_wait_for_ready(true);
request.set_message(kRequestMessage_);
if (server_fail) {
if (rpc_options.server_fail) {
request.mutable_param()->mutable_expected_error()->set_code(
GRPC_STATUS_FAILED_PRECONDITION);
}
ClientContext context;
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
if (wait_for_ready) context.set_wait_for_ready(true);
Status status = stub_->Echo(&context, request, response);
Status status;
switch (rpc_options.service) {
case SERVICE_ECHO:
status =
SendRpcMethod(&stub_, rpc_options, &context, request, response);
break;
case SERVICE_ECHO1:
status =
SendRpcMethod(&stub1_, rpc_options, &context, request, response);
break;
case SERVICE_ECHO2:
status =
SendRpcMethod(&stub2_, rpc_options, &context, request, response);
break;
}
if (local_response) delete response;
return status;
}
void CheckRpcSendOk(const size_t times = 1, const int timeout_ms = 1000,
bool wait_for_ready = false) {
void CheckRpcSendOk(const size_t times = 1,
const RpcOptions& rpc_options = RpcOptions()) {
for (size_t i = 0; i < times; ++i) {
EchoResponse response;
const Status status = SendRpc(&response, timeout_ms, wait_for_ready);
const Status status = SendRpc(rpc_options, &response);
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
EXPECT_EQ(response.message(), kRequestMessage_);
@ -1371,7 +1466,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
void CheckRpcSendFailure(const size_t times = 1, bool server_fail = false) {
for (size_t i = 0; i < times; ++i) {
const Status status = SendRpc(nullptr, 1000, false, server_fail);
const Status status = SendRpc(RpcOptions().set_server_fail(server_fail));
EXPECT_FALSE(status.ok());
}
}
@ -1451,20 +1546,46 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
class BackendServerThread : public ServerThread {
public:
BackendServiceImpl* backend_service() { return &backend_service_; }
BackendServiceImpl<::grpc::testing::EchoTestService::Service>*
backend_service() {
return &backend_service_;
}
BackendServiceImpl<::grpc::testing::EchoTest1Service::Service>*
backend_service1() {
return &backend_service1_;
}
BackendServiceImpl<::grpc::testing::EchoTest2Service::Service>*
backend_service2() {
return &backend_service2_;
}
private:
void RegisterAllServices(ServerBuilder* builder) override {
builder->RegisterService(&backend_service_);
builder->RegisterService(&backend_service1_);
builder->RegisterService(&backend_service2_);
}
void StartAllServices() override { backend_service_.Start(); }
void StartAllServices() override {
backend_service_.Start();
backend_service1_.Start();
backend_service2_.Start();
}
void ShutdownAllServices() override { backend_service_.Shutdown(); }
void ShutdownAllServices() override {
backend_service_.Shutdown();
backend_service1_.Shutdown();
backend_service2_.Shutdown();
}
const char* Type() override { return "Backend"; }
BackendServiceImpl backend_service_;
BackendServiceImpl<::grpc::testing::EchoTestService::Service>
backend_service_;
BackendServiceImpl<::grpc::testing::EchoTest1Service::Service>
backend_service1_;
BackendServiceImpl<::grpc::testing::EchoTest2Service::Service>
backend_service2_;
};
class BalancerServerThread : public ServerThread {
@ -1503,6 +1624,8 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
const int client_load_reporting_interval_seconds_;
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<grpc::testing::EchoTest1Service::Stub> stub1_;
std::unique_ptr<grpc::testing::EchoTest2Service::Stub> stub2_;
std::vector<std::unique_ptr<BackendServerThread>> backends_;
std::vector<std::unique_ptr<BalancerServerThread>> balancers_;
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
@ -1560,9 +1683,9 @@ TEST_P(BasicTest, Vanilla) {
backends_[i]->backend_service()->request_count());
}
// Check LB policy name for the channel.
EXPECT_EQ(
(GetParam().use_xds_resolver() ? "cds_experimental" : "eds_experimental"),
channel_->GetLoadBalancingPolicyName());
EXPECT_EQ((GetParam().use_xds_resolver() ? "xds_routing_experimental"
: "eds_experimental"),
channel_->GetLoadBalancingPolicyName());
}
TEST_P(BasicTest, IgnoresUnhealthyEndpoints) {
@ -1639,7 +1762,8 @@ TEST_P(BasicTest, InitiallyEmptyServerlist) {
kDefaultResourceName));
const auto t0 = system_clock::now();
// Client will block: LB will initially send empty serverlist.
CheckRpcSendOk(1, kCallDeadlineMs, true /* wait_for_ready */);
CheckRpcSendOk(
1, RpcOptions().set_timeout_ms(kCallDeadlineMs).set_wait_for_ready(true));
const auto ellapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
system_clock::now() - t0);
@ -1687,8 +1811,7 @@ TEST_P(BasicTest, BackendsRestart) {
CheckRpcSendFailure();
// Restart all backends. RPCs should start succeeding again.
StartAllBackends();
CheckRpcSendOk(1 /* times */, 2000 /* timeout_ms */,
true /* wait_for_ready */);
CheckRpcSendOk(1, RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
}
using XdsResolverOnlyTest = BasicTest;
@ -2096,7 +2219,7 @@ TEST_P(LdsTest, ChooseLastRoute) {
}
// Tests that LDS client should send a NACK if route match has non-empty prefix
// in the LDS response.
// as the only route (default) in the LDS response.
TEST_P(LdsTest, RouteMatchHasNonemptyPrefix) {
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
@ -2113,6 +2236,247 @@ TEST_P(LdsTest, RouteMatchHasNonemptyPrefix) {
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has a prefix
// string with no "/".
TEST_P(LdsTest, RouteMatchHasInvalidPrefixNonEmptyNoSlash) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("grpc.testing.EchoTest1Service");
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has a prefix
// string does not end with "/".
TEST_P(LdsTest, RouteMatchHasInvalidPrefixNoEndingSlash) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has a prefix
// string does not start with "/".
TEST_P(LdsTest, RouteMatchHasInvalidPrefixNoLeadingSlash) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("grpc.testing.EchoTest1Service/");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has a prefix
// string with extra content outside of "/service/".
TEST_P(LdsTest, RouteMatchHasInvalidPrefixExtraContent) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/Echo1");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has a prefix
// string "//".
TEST_P(LdsTest, RouteMatchHasInvalidPrefixNoContent) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("//");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has path
// but it's empty.
TEST_P(LdsTest, RouteMatchHasInvalidPathEmptyPath) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
route1->mutable_match()->set_path("");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has path
// string does not start with "/".
TEST_P(LdsTest, RouteMatchHasInvalidPathNoLeadingSlash) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
route1->mutable_match()->set_path("grpc.testing.EchoTest1Service/Echo1");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has path
// string that ends with "/".
TEST_P(LdsTest, RouteMatchHasInvalidPathEndsWithSlash) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
route1->mutable_match()->set_path("/grpc.testing.EchoTest1Service/Echo1/");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has path
// string that misses "/" between service and method.
TEST_P(LdsTest, RouteMatchHasInvalidPathMissingMiddleSlash) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
route1->mutable_match()->set_path("/grpc.testing.EchoTest1Service.Echo1");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has path
// string that is missing service.
TEST_P(LdsTest, RouteMatchHasInvalidPathMissingService) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
route1->mutable_match()->set_path("//Echo1");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route match has path
// string that is missing method.
TEST_P(LdsTest, RouteMatchHasInvalidPathMissingMethod) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
RouteConfiguration route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
auto* default_route = route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
route1->mutable_match()->set_path("/grpc.testing.EchoTest1Service/");
balancers_[0]->ads_service()->SetLdsResource(
AdsServiceImpl::BuildListener(route_config), kDefaultResourceName);
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
CheckRpcSendFailure();
EXPECT_EQ(balancers_[0]->ads_service()->lds_response_state(),
AdsServiceImpl::NACKED);
}
// Tests that LDS client should send a NACK if route has an action other than
// RouteAction in the LDS response.
TEST_P(LdsTest, RouteHasNoRouteAction) {
@ -2128,6 +2492,9 @@ TEST_P(LdsTest, RouteHasNoRouteAction) {
AdsServiceImpl::NACKED);
}
// TODO@donnadionne: Add more invalid config tests to cover all errors in
// xds_api.cc
// Tests that LDS client should send a NACK if RouteAction has a
// cluster_specifier other than cluster in the LDS response.
TEST_P(LdsTest, RouteActionHasNoCluster) {
@ -2155,6 +2522,160 @@ TEST_P(LdsTest, Timeout) {
CheckRpcSendFailure();
}
// Tests that LDS client should choose the default route (with no matching
// specified) after unable to find a match with previous routes.
TEST_P(LdsTest, XdsRoutingPathMatching) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "new_cluster_2";
const size_t kNumEcho1Rpcs = 10;
const size_t kNumEcho2Rpcs = 20;
const size_t kNumEchoRpcs = 30;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 2)},
});
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(2, 3)},
});
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(3, 4)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args1, kNewCluster1Name),
kNewCluster1Name);
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2, kNewCluster2Name),
kNewCluster2Name);
// Populate new CDS resources.
Cluster new_cluster1 = balancers_[0]->ads_service()->default_cluster();
new_cluster1.set_name(kNewCluster1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1, kNewCluster1Name);
Cluster new_cluster2 = balancers_[0]->ads_service()->default_cluster();
new_cluster2.set_name(kNewCluster2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2, kNewCluster2Name);
// Populating Route Configurations for LDS.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_path("/grpc.testing.EchoTest1Service/Echo1");
route1->mutable_route()->set_cluster(kNewCluster1Name);
auto* route2 = new_route_config.mutable_virtual_hosts(0)->add_routes();
route2->mutable_match()->set_path("/grpc.testing.EchoTest2Service/Echo2");
route2->mutable_route()->set_cluster(kNewCluster2Name);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
Listener listener =
balancers_[0]->ads_service()->BuildListener(new_route_config);
balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);
WaitForAllBackends(0, 2);
CheckRpcSendOk(kNumEchoRpcs, RpcOptions().set_wait_for_ready(true));
CheckRpcSendOk(kNumEcho1Rpcs, RpcOptions()
.set_rpc_service(SERVICE_ECHO1)
.set_rpc_method(METHOD_ECHO1)
.set_wait_for_ready(true));
CheckRpcSendOk(kNumEcho2Rpcs, RpcOptions()
.set_rpc_service(SERVICE_ECHO2)
.set_rpc_method(METHOD_ECHO2)
.set_wait_for_ready(true));
// Make sure RPCs all go to the correct backend.
for (size_t i = 0; i < 2; ++i) {
EXPECT_EQ(kNumEchoRpcs / 2,
backends_[i]->backend_service()->request_count());
EXPECT_EQ(0, backends_[i]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[i]->backend_service2()->request_count());
}
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
EXPECT_EQ(kNumEcho1Rpcs, backends_[2]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service2()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service1()->request_count());
EXPECT_EQ(kNumEcho2Rpcs, backends_[3]->backend_service2()->request_count());
}
TEST_P(LdsTest, XdsRoutingPrefixMatching) {
ResetStub(/*failover_timeout=*/0,
/*expected_targets=*/"",
/*xds_resource_does_not_exist_timeout*/ 0,
/*xds_routing_enabled=*/true);
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewCluster2Name = "new_cluster_2";
const size_t kNumEcho1Rpcs = 10;
const size_t kNumEcho2Rpcs = 20;
const size_t kNumEchoRpcs = 30;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Populate new EDS resources.
AdsServiceImpl::EdsResourceArgs args({
{"locality0", GetBackendPorts(0, 2)},
});
AdsServiceImpl::EdsResourceArgs args1({
{"locality0", GetBackendPorts(2, 3)},
});
AdsServiceImpl::EdsResourceArgs args2({
{"locality0", GetBackendPorts(3, 4)},
});
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args1, kNewCluster1Name),
kNewCluster1Name);
balancers_[0]->ads_service()->SetEdsResource(
AdsServiceImpl::BuildEdsResource(args2, kNewCluster2Name),
kNewCluster2Name);
// Populate new CDS resources.
Cluster new_cluster1 = balancers_[0]->ads_service()->default_cluster();
new_cluster1.set_name(kNewCluster1Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster1, kNewCluster1Name);
Cluster new_cluster2 = balancers_[0]->ads_service()->default_cluster();
new_cluster2.set_name(kNewCluster2Name);
balancers_[0]->ads_service()->SetCdsResource(new_cluster2, kNewCluster2Name);
// Populating Route Configurations for LDS.
RouteConfiguration new_route_config =
balancers_[0]->ads_service()->default_route_config();
auto* route1 = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
route1->mutable_match()->set_prefix("/grpc.testing.EchoTest1Service/");
route1->mutable_route()->set_cluster(kNewCluster1Name);
auto* route2 = new_route_config.mutable_virtual_hosts(0)->add_routes();
route2->mutable_match()->set_prefix("/grpc.testing.EchoTest2Service/");
route2->mutable_route()->set_cluster(kNewCluster2Name);
auto* default_route = new_route_config.mutable_virtual_hosts(0)->add_routes();
default_route->mutable_match()->set_prefix("");
default_route->mutable_route()->set_cluster(kDefaultResourceName);
Listener listener =
balancers_[0]->ads_service()->BuildListener(new_route_config);
balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName);
WaitForAllBackends(0, 2);
CheckRpcSendOk(kNumEchoRpcs, RpcOptions().set_wait_for_ready(true));
CheckRpcSendOk(
kNumEcho1Rpcs,
RpcOptions().set_rpc_service(SERVICE_ECHO1).set_wait_for_ready(true));
CheckRpcSendOk(
kNumEcho2Rpcs,
RpcOptions().set_rpc_service(SERVICE_ECHO2).set_wait_for_ready(true));
// Make sure RPCs all go to the correct backend.
for (size_t i = 0; i < 2; ++i) {
EXPECT_EQ(kNumEchoRpcs / 2,
backends_[i]->backend_service()->request_count());
EXPECT_EQ(0, backends_[i]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[i]->backend_service2()->request_count());
}
EXPECT_EQ(0, backends_[2]->backend_service()->request_count());
EXPECT_EQ(kNumEcho1Rpcs, backends_[2]->backend_service1()->request_count());
EXPECT_EQ(0, backends_[2]->backend_service2()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service()->request_count());
EXPECT_EQ(0, backends_[3]->backend_service1()->request_count());
EXPECT_EQ(kNumEcho2Rpcs, backends_[3]->backend_service2()->request_count());
}
using RdsTest = BasicTest;
// Tests that RDS client should send an ACK upon correct RDS response.
@ -2228,7 +2749,7 @@ TEST_P(RdsTest, ChooseLastRoute) {
}
// Tests that RDS client should send a NACK if route match has non-empty prefix
// in the RDS response.
// as the only route (default) in the RDS response.
TEST_P(RdsTest, RouteMatchHasNonemptyPrefix) {
balancers_[0]->ads_service()->SetLdsToUseDynamicRds();
RouteConfiguration route_config =
@ -2236,7 +2757,7 @@ TEST_P(RdsTest, RouteMatchHasNonemptyPrefix) {
route_config.mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_match()
->set_prefix("nonempty_prefix");
->set_prefix("/nonempty_prefix/");
balancers_[0]->ads_service()->SetRdsResource(route_config,
kDefaultResourceName);
SetNextResolution({});
@ -2849,7 +3370,7 @@ TEST_P(DropTest, Vanilla) {
size_t num_drops = 0;
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@ -2889,7 +3410,7 @@ TEST_P(DropTest, DropPerHundred) {
size_t num_drops = 0;
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@ -2928,7 +3449,7 @@ TEST_P(DropTest, DropPerTenThousand) {
size_t num_drops = 0;
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@ -2971,7 +3492,7 @@ TEST_P(DropTest, Update) {
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@ -3003,7 +3524,7 @@ TEST_P(DropTest, Update) {
size_t num_rpcs = kNumRpcs;
while (seen_drop_rate < kDropRateThreshold) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
++num_rpcs;
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
@ -3020,7 +3541,7 @@ TEST_P(DropTest, Update) {
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;
@ -3057,7 +3578,7 @@ TEST_P(DropTest, DropAll) {
// Send kNumRpcs RPCs and all of them are dropped.
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
@ -3452,7 +3973,7 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
// Send kNumRpcs RPCs and count the drops.
for (size_t i = 0; i < kNumRpcs; ++i) {
EchoResponse response;
const Status status = SendRpc(&response);
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
++num_drops;

@ -48,29 +48,35 @@ using grpc::testing::EchoResponse;
#define ECHO_TEST_SERVICE_SUMMARY \
"Echo\n" \
"Echo1\n" \
"Echo2\n" \
"CheckClientInitialMetadata\n" \
"RequestStream\n" \
"ResponseStream\n" \
"BidiStream\n" \
"Unimplemented\n"
#define ECHO_TEST_SERVICE_DESCRIPTION \
"filename: src/proto/grpc/testing/echo.proto\n" \
"package: grpc.testing;\n" \
"service EchoTestService {\n" \
" rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \
"{}\n" \
" rpc CheckClientInitialMetadata(grpc.testing.SimpleRequest) returns " \
"(grpc.testing.SimpleResponse) {}\n" \
" rpc RequestStream(stream grpc.testing.EchoRequest) returns " \
"(grpc.testing.EchoResponse) {}\n" \
" rpc ResponseStream(grpc.testing.EchoRequest) returns (stream " \
"grpc.testing.EchoResponse) {}\n" \
" rpc BidiStream(stream grpc.testing.EchoRequest) returns (stream " \
"grpc.testing.EchoResponse) {}\n" \
" rpc Unimplemented(grpc.testing.EchoRequest) returns " \
"(grpc.testing.EchoResponse) {}\n" \
"}\n" \
#define ECHO_TEST_SERVICE_DESCRIPTION \
"filename: src/proto/grpc/testing/echo.proto\n" \
"package: grpc.testing;\n" \
"service EchoTestService {\n" \
" rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \
"{}\n" \
" rpc Echo1(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \
"{}\n" \
" rpc Echo2(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \
"{}\n" \
" rpc CheckClientInitialMetadata(grpc.testing.SimpleRequest) returns " \
"(grpc.testing.SimpleResponse) {}\n" \
" rpc RequestStream(stream grpc.testing.EchoRequest) returns " \
"(grpc.testing.EchoResponse) {}\n" \
" rpc ResponseStream(grpc.testing.EchoRequest) returns (stream " \
"grpc.testing.EchoResponse) {}\n" \
" rpc BidiStream(stream grpc.testing.EchoRequest) returns (stream " \
"grpc.testing.EchoResponse) {}\n" \
" rpc Unimplemented(grpc.testing.EchoRequest) returns " \
"(grpc.testing.EchoResponse) {}\n" \
"}\n" \
"\n"
#define ECHO_METHOD_DESCRIPTION \
@ -1103,7 +1109,8 @@ TEST_F(GrpcToolTest, CallCommandWithMetadata) {
TEST_F(GrpcToolTest, CallCommandWithBadMetadata) {
// Test input "grpc_cli call localhost:10000 Echo "message: 'Hello'"
const char* argv[] = {"grpc_cli", "call", "localhost:10000", "Echo",
const char* argv[] = {"grpc_cli", "call", "localhost:10000",
"grpc.testing.EchoTestService.Echo",
"message: 'Hello'"};
FLAGS_protofiles = "src/proto/grpc/testing/echo.proto";
char* test_srcdir = gpr_getenv("TEST_SRCDIR");

@ -1116,6 +1116,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/lb_policy_registry.h \

@ -913,6 +913,7 @@ src/core/ext/filters/client_channel/lb_policy/xds/cds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/eds.cc \
src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc \
src/core/ext/filters/client_channel/lb_policy/xds/xds.h \
src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc \
src/core/ext/filters/client_channel/lb_policy_factory.h \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
src/core/ext/filters/client_channel/lb_policy_registry.h \

Loading…
Cancel
Save