Merge pull request #18052 from mhaidrygoog/xds_work

Support for LocalityMap for XDS LB policy
pull/18588/head
Moiz Haidry 6 years ago committed by GitHub
commit 5e26c4339d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 4
      BUILD.gn
  3. 40
      CMakeLists.txt
  4. 48
      Makefile
  5. 18
      build.yaml
  6. 4
      gRPC-C++.podspec
  7. 4
      gRPC-Core.podspec
  8. 2
      grpc.gemspec
  9. 2
      package.xml
  10. 535
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  11. 419
      src/core/lib/gprpp/map.h
  12. 38
      src/core/lib/gprpp/pair.h
  13. 13
      test/core/gprpp/BUILD
  14. 409
      test/core/gprpp/map_test.cc
  15. 10
      test/core/util/BUILD
  16. 2
      tools/doxygen/Doxyfile.c++.internal
  17. 2
      tools/doxygen/Doxyfile.core.internal
  18. 25
      tools/run_tests/generated/sources_and_headers.json
  19. 24
      tools/run_tests/generated/tests.json

@ -581,8 +581,10 @@ grpc_cc_library(
"src/core/lib/gprpp/abstract.h", "src/core/lib/gprpp/abstract.h",
"src/core/lib/gprpp/fork.h", "src/core/lib/gprpp/fork.h",
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h", "src/core/lib/profiling/timers.h",
], ],

@ -184,8 +184,10 @@ config("grpc_config") {
"src/core/lib/gprpp/fork.cc", "src/core/lib/gprpp/fork.cc",
"src/core/lib/gprpp/fork.h", "src/core/lib/gprpp/fork.h",
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/gprpp/thd_posix.cc", "src/core/lib/gprpp/thd_posix.cc",
"src/core/lib/gprpp/thd_windows.cc", "src/core/lib/gprpp/thd_windows.cc",
@ -1146,10 +1148,12 @@ config("grpc_config") {
"src/core/lib/gprpp/fork.h", "src/core/lib/gprpp/fork.h",
"src/core/lib/gprpp/inlined_vector.h", "src/core/lib/gprpp/inlined_vector.h",
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/optional.h", "src/core/lib/gprpp/optional.h",
"src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted.h",
"src/core/lib/gprpp/ref_counted_ptr.h", "src/core/lib/gprpp/ref_counted_ptr.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",

@ -631,6 +631,7 @@ add_dependencies(buildtests_cxx generic_end2end_test)
add_dependencies(buildtests_cxx golden_file_test) add_dependencies(buildtests_cxx golden_file_test)
add_dependencies(buildtests_cxx grpc_alts_credentials_options_test) add_dependencies(buildtests_cxx grpc_alts_credentials_options_test)
add_dependencies(buildtests_cxx grpc_cli) add_dependencies(buildtests_cxx grpc_cli)
add_dependencies(buildtests_cxx grpc_core_map_test)
add_dependencies(buildtests_cxx grpc_linux_system_roots_test) add_dependencies(buildtests_cxx grpc_linux_system_roots_test)
add_dependencies(buildtests_cxx grpc_tool_test) add_dependencies(buildtests_cxx grpc_tool_test)
add_dependencies(buildtests_cxx grpclb_api_test) add_dependencies(buildtests_cxx grpclb_api_test)
@ -13465,6 +13466,45 @@ target_link_libraries(grpc_cli
) )
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(grpc_core_map_test
test/core/gprpp/map_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(grpc_core_map_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(grpc_core_map_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc++
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS) endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_CODEGEN) if (gRPC_BUILD_CODEGEN)

@ -1209,6 +1209,7 @@ generic_end2end_test: $(BINDIR)/$(CONFIG)/generic_end2end_test
golden_file_test: $(BINDIR)/$(CONFIG)/golden_file_test golden_file_test: $(BINDIR)/$(CONFIG)/golden_file_test
grpc_alts_credentials_options_test: $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test grpc_alts_credentials_options_test: $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test
grpc_cli: $(BINDIR)/$(CONFIG)/grpc_cli grpc_cli: $(BINDIR)/$(CONFIG)/grpc_cli
grpc_core_map_test: $(BINDIR)/$(CONFIG)/grpc_core_map_test
grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin
grpc_csharp_plugin: $(BINDIR)/$(CONFIG)/grpc_csharp_plugin grpc_csharp_plugin: $(BINDIR)/$(CONFIG)/grpc_csharp_plugin
grpc_linux_system_roots_test: $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test grpc_linux_system_roots_test: $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test
@ -1680,6 +1681,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/golden_file_test \ $(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \ $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
$(BINDIR)/$(CONFIG)/grpc_cli \ $(BINDIR)/$(CONFIG)/grpc_cli \
$(BINDIR)/$(CONFIG)/grpc_core_map_test \
$(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \ $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \ $(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \ $(BINDIR)/$(CONFIG)/grpclb_api_test \
@ -1821,6 +1823,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/golden_file_test \ $(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \ $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test \
$(BINDIR)/$(CONFIG)/grpc_cli \ $(BINDIR)/$(CONFIG)/grpc_cli \
$(BINDIR)/$(CONFIG)/grpc_core_map_test \
$(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \ $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \ $(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \ $(BINDIR)/$(CONFIG)/grpclb_api_test \
@ -2311,6 +2314,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/golden_file_test || ( echo test golden_file_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/golden_file_test || ( echo test golden_file_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_alts_credentials_options_test" $(E) "[RUN] Testing grpc_alts_credentials_options_test"
$(Q) $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test || ( echo test grpc_alts_credentials_options_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/grpc_alts_credentials_options_test || ( echo test grpc_alts_credentials_options_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_core_map_test"
$(Q) $(BINDIR)/$(CONFIG)/grpc_core_map_test || ( echo test grpc_core_map_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_linux_system_roots_test" $(E) "[RUN] Testing grpc_linux_system_roots_test"
$(Q) $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test || ( echo test grpc_linux_system_roots_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/grpc_linux_system_roots_test || ( echo test grpc_linux_system_roots_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_tool_test" $(E) "[RUN] Testing grpc_tool_test"
@ -16431,6 +16436,49 @@ endif
endif endif
GRPC_CORE_MAP_TEST_SRC = \
test/core/gprpp/map_test.cc \
GRPC_CORE_MAP_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_CORE_MAP_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/grpc_core_map_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/grpc_core_map_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/grpc_core_map_test: $(PROTOBUF_DEP) $(GRPC_CORE_MAP_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(GRPC_CORE_MAP_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_core_map_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/gprpp/map_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_grpc_core_map_test: $(GRPC_CORE_MAP_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(GRPC_CORE_MAP_TEST_OBJS:.o=.dep)
endif
endif
GRPC_CPP_PLUGIN_SRC = \ GRPC_CPP_PLUGIN_SRC = \
src/compiler/cpp_plugin.cc \ src/compiler/cpp_plugin.cc \

@ -194,8 +194,10 @@ filegroups:
- src/core/lib/gprpp/atomic.h - src/core/lib/gprpp/atomic.h
- src/core/lib/gprpp/fork.h - src/core/lib/gprpp/fork.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/map.h
- src/core/lib/gprpp/memory.h - src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mutex_lock.h - src/core/lib/gprpp/mutex_lock.h
- src/core/lib/gprpp/pair.h
- src/core/lib/gprpp/thd.h - src/core/lib/gprpp/thd.h
- src/core/lib/profiling/timers.h - src/core/lib/profiling/timers.h
uses: uses:
@ -4739,6 +4741,22 @@ targets:
- grpc - grpc
- gpr - gpr
- grpc++_test_config - grpc++_test_config
- name: grpc_core_map_test
gtest: true
build: test
language: c++
headers:
- test/core/gprpp/map_tester.h
src:
- test/core/gprpp/map_test.cc
deps:
- grpc_test_util
- grpc++
- grpc
- gpr
uses:
- grpc++_test
uses_polling: false
- name: grpc_cpp_plugin - name: grpc_cpp_plugin
build: protoc build: protoc
language: c++ language: c++

@ -256,8 +256,10 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h',
@ -571,8 +573,10 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/lib/avl/avl.h', 'src/core/lib/avl/avl.h',

@ -208,8 +208,10 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/lib/gpr/alloc.cc', 'src/core/lib/gpr/alloc.cc',
@ -877,8 +879,10 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/atomic.h', 'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/fork.h', 'src/core/lib/gprpp/fork.h',
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h',

@ -102,8 +102,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/atomic.h ) s.files += %w( src/core/lib/gprpp/atomic.h )
s.files += %w( src/core/lib/gprpp/fork.h ) s.files += %w( src/core/lib/gprpp/fork.h )
s.files += %w( src/core/lib/gprpp/manual_constructor.h ) s.files += %w( src/core/lib/gprpp/manual_constructor.h )
s.files += %w( src/core/lib/gprpp/map.h )
s.files += %w( src/core/lib/gprpp/memory.h ) s.files += %w( src/core/lib/gprpp/memory.h )
s.files += %w( src/core/lib/gprpp/mutex_lock.h ) s.files += %w( src/core/lib/gprpp/mutex_lock.h )
s.files += %w( src/core/lib/gprpp/pair.h )
s.files += %w( src/core/lib/gprpp/thd.h ) s.files += %w( src/core/lib/gprpp/thd.h )
s.files += %w( src/core/lib/profiling/timers.h ) s.files += %w( src/core/lib/profiling/timers.h )
s.files += %w( src/core/lib/gpr/alloc.cc ) s.files += %w( src/core/lib/gpr/alloc.cc )

@ -107,8 +107,10 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/atomic.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/atomic.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/fork.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/fork.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/manual_constructor.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/manual_constructor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/memory.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/memory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/mutex_lock.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/mutex_lock.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/thd.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/thd.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/profiling/timers.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/profiling/timers.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" />

@ -68,7 +68,9 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include "include/grpc/support/alloc.h"
#include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h"
@ -85,6 +87,7 @@
#include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
@ -114,6 +117,7 @@ TraceFlag grpc_lb_xds_trace(false, "xds");
namespace { namespace {
constexpr char kXds[] = "xds_experimental"; constexpr char kXds[] = "xds_experimental";
constexpr char kDefaultLocalityName[] = "xds_default_locality";
class XdsLb : public LoadBalancingPolicy { class XdsLb : public LoadBalancingPolicy {
public: public:
@ -128,6 +132,9 @@ class XdsLb : public LoadBalancingPolicy {
channelz::ChildRefsList* child_channels) override; channelz::ChildRefsList* child_channels) override;
private: private:
struct LocalityServerlistEntry;
using LocalityList = InlinedVector<UniquePtr<LocalityServerlistEntry>, 1>;
/// Contains a channel to the LB server and all the data related to the /// Contains a channel to the LB server and all the data related to the
/// channel. /// channel.
class BalancerChannelState class BalancerChannelState
@ -266,25 +273,88 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsLbClientStats> client_stats_; RefCountedPtr<XdsLbClientStats> client_stats_;
}; };
class Helper : public ChannelControlHelper { class LocalityMap {
public: public:
explicit Helper(RefCountedPtr<XdsLb> parent) : parent_(std::move(parent)) {} class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
public:
explicit LocalityEntry(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) {
gpr_mu_init(&child_policy_mu_);
}
~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); }
void UpdateLocked(xds_grpclb_serverlist* serverlist,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args);
void ShutdownLocked();
void ResetBackoffLocked();
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels);
void Orphan() override;
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<LocalityEntry> entry)
: entry_(std::move(entry)) {}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const;
bool CalledByCurrentChild() const;
RefCountedPtr<LocalityEntry> entry_;
LoadBalancingPolicy* child_ = nullptr;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args);
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args);
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu child_policy_mu_;
RefCountedPtr<XdsLb> parent_;
};
Subchannel* CreateSubchannel(const grpc_channel_args& args) override; LocalityMap() { gpr_mu_init(&child_refs_mu_); }
grpc_channel* CreateChannel(const char* target, ~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); }
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void set_child(LoadBalancingPolicy* child) { child_ = child; } void UpdateLocked(const LocalityList& locality_list,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent);
void ShutdownLocked();
void ResetBackoffLocked();
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels);
private: private:
bool CalledByPendingChild() const; void PruneLocalities(const LocalityList& locality_list);
bool CalledByCurrentChild() const; Map<UniquePtr<char>, OrphanablePtr<LocalityEntry>, StringLess> map_;
// Lock held while filling child refs for all localities
// inside the map
gpr_mu child_refs_mu_;
};
RefCountedPtr<XdsLb> parent_; struct LocalityServerlistEntry {
LoadBalancingPolicy* child_ = nullptr; ~LocalityServerlistEntry() {
gpr_free(locality_name);
xds_grpclb_destroy_serverlist(serverlist);
}
char* locality_name;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
xds_grpclb_serverlist* serverlist;
}; };
~XdsLb(); ~XdsLb();
@ -309,12 +379,6 @@ class XdsLb : public LoadBalancingPolicy {
// Callback to enter fallback mode. // Callback to enter fallback mode.
static void OnFallbackTimerLocked(void* arg, grpc_error* error); static void OnFallbackTimerLocked(void* arg, grpc_error* error);
// Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked();
grpc_channel_args* CreateChildPolicyArgsLocked();
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args);
// Who the client is trying to communicate with. // Who the client is trying to communicate with.
const char* server_name_ = nullptr; const char* server_name_ = nullptr;
@ -338,10 +402,6 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for the LB call. 0 means no deadline. // Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0; int lb_call_timeout_ms_ = 0;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
xds_grpclb_serverlist* serverlist_ = nullptr;
// Timeout in milliseconds for before using fallback backend addresses. // Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback. // 0 means not using fallback.
RefCountedPtr<Config> fallback_policy_config_; RefCountedPtr<Config> fallback_policy_config_;
@ -355,11 +415,12 @@ class XdsLb : public LoadBalancingPolicy {
// The policy to use for the backends. // The policy to use for the backends.
RefCountedPtr<Config> child_policy_config_; RefCountedPtr<Config> child_policy_config_;
OrphanablePtr<LoadBalancingPolicy> child_policy_; // Map of policies to use in the backend
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; LocalityMap locality_map_;
// Lock held when modifying the value of child_policy_ or LocalityList locality_serverlist_;
// pending_child_policy_. // TODO(mhaidry) : Add a pending locality map that may be swapped with the
gpr_mu child_policy_mu_; // the current one when new localities in the pending map are ready
// to accept connections
}; };
// //
@ -378,105 +439,6 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
return result; return result;
} }
//
// XdsLb::Helper
//
bool XdsLb::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_child_policy_.get();
}
bool XdsLb::Helper::CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->child_policy_.get();
}
Subchannel* XdsLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* XdsLb::Helper::CreateChannel(const char* target,
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateChannel(target, args);
}
void XdsLb::Helper::UpdateState(grpc_connectivity_state state,
grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) {
if (parent_->shutting_down_) {
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p helper %p] pending child policy %p reports state=%s",
parent_.get(), this, parent_->pending_child_policy_.get(),
grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) {
GRPC_ERROR_UNREF(state_error);
return;
}
grpc_pollset_set_del_pollset_set(
parent_->child_policy_->interested_parties(),
parent_->interested_parties());
MutexLock lock(&parent_->child_policy_mu_);
parent_->child_policy_ = std::move(parent_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
GPR_ASSERT(parent_->lb_chand_ != nullptr);
RefCountedPtr<XdsLbClientStats> client_stats =
parent_->lb_chand_->lb_calld() == nullptr
? nullptr
: parent_->lb_chand_->lb_calld()->client_stats();
parent_->channel_control_helper()->UpdateState(
state, state_error,
UniquePtr<SubchannelPicker>(
New<Picker>(std::move(picker), std::move(client_stats))));
}
void XdsLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if (parent_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Re-resolution requested from the internal RR policy "
"(%p).",
parent_.get(), parent_->child_policy_.get());
}
GPR_ASSERT(parent_->lb_chand_ != nullptr);
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if (parent_->lb_chand_->lb_calld() == nullptr ||
!parent_->lb_chand_->lb_calld()->seen_initial_response()) {
parent_->channel_control_helper()->RequestReresolution();
}
}
// //
// serverlist parsing code // serverlist parsing code
// //
@ -951,7 +913,9 @@ void XdsLb::BalancerChannelState::BalancerCallState::
self.release(); self.release();
lb_calld->ScheduleNextClientLoadReportLocked(); lb_calld->ScheduleNextClientLoadReportLocked();
} }
if (xds_grpclb_serverlist_equals(xdslb_policy->serverlist_, serverlist)) { if (!xdslb_policy->locality_serverlist_.empty() &&
xds_grpclb_serverlist_equals(
xdslb_policy->locality_serverlist_[0]->serverlist, serverlist)) {
if (grpc_lb_xds_trace.enabled()) { if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xdslb %p] Incoming server list identical to current, " "[xdslb %p] Incoming server list identical to current, "
@ -960,21 +924,31 @@ void XdsLb::BalancerChannelState::BalancerCallState::
} }
xds_grpclb_destroy_serverlist(serverlist); xds_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */ } else { /* new serverlist */
if (xdslb_policy->serverlist_ != nullptr) { if (!xdslb_policy->locality_serverlist_.empty()) {
/* dispose of the old serverlist */ /* dispose of the old serverlist */
xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); xds_grpclb_destroy_serverlist(
xdslb_policy->locality_serverlist_[0]->serverlist);
} else { } else {
/* or dispose of the fallback */ /* or dispose of the fallback */
xdslb_policy->fallback_backend_addresses_.reset(); xdslb_policy->fallback_backend_addresses_.reset();
if (xdslb_policy->fallback_timer_callback_pending_) { if (xdslb_policy->fallback_timer_callback_pending_) {
grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_);
} }
/* Initialize locality serverlist, currently the list only handles
* one child */
xdslb_policy->locality_serverlist_.emplace_back(
MakeUnique<LocalityServerlistEntry>());
xdslb_policy->locality_serverlist_[0]->locality_name =
static_cast<char*>(gpr_strdup(kDefaultLocalityName));
} }
// and update the copy in the XdsLb instance. This // and update the copy in the XdsLb instance. This
// serverlist instance will be destroyed either upon the next // serverlist instance will be destroyed either upon the next
// update or when the XdsLb instance is destroyed. // update or when the XdsLb instance is destroyed.
xdslb_policy->serverlist_ = serverlist; xdslb_policy->locality_serverlist_[0]->serverlist = serverlist;
xdslb_policy->CreateOrUpdateChildPolicyLocked(); xdslb_policy->locality_map_.UpdateLocked(
xdslb_policy->locality_serverlist_,
xdslb_policy->child_policy_config_.get(), xdslb_policy->args_,
xdslb_policy);
} }
} else { } else {
if (grpc_lb_xds_trace.enabled()) { if (grpc_lb_xds_trace.enabled()) {
@ -1112,9 +1086,11 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) {
// ctor and dtor // ctor and dtor
// //
XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)) { XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
locality_map_(),
locality_serverlist_() {
gpr_mu_init(&lb_chand_mu_); gpr_mu_init(&lb_chand_mu_);
gpr_mu_init(&child_policy_mu_);
// Record server name. // Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg); const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1141,10 +1117,7 @@ XdsLb::~XdsLb() {
gpr_mu_destroy(&lb_chand_mu_); gpr_mu_destroy(&lb_chand_mu_);
gpr_free((void*)server_name_); gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_); grpc_channel_args_destroy(args_);
if (serverlist_ != nullptr) { locality_serverlist_.clear();
xds_grpclb_destroy_serverlist(serverlist_);
}
gpr_mu_destroy(&child_policy_mu_);
} }
void XdsLb::ShutdownLocked() { void XdsLb::ShutdownLocked() {
@ -1152,19 +1125,7 @@ void XdsLb::ShutdownLocked() {
if (fallback_timer_callback_pending_) { if (fallback_timer_callback_pending_) {
grpc_timer_cancel(&lb_fallback_timer_); grpc_timer_cancel(&lb_fallback_timer_);
} }
if (child_policy_ != nullptr) { locality_map_.ShutdownLocked();
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
}
if (pending_child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_child_policy_->interested_parties(), interested_parties());
}
{
MutexLock lock(&child_policy_mu_);
child_policy_.reset();
pending_child_policy_.reset();
}
// We destroy the LB channel here instead of in our destructor because // We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to // destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be // OnBalancerChannelConnectivityChangedLocked(), and we need to be
@ -1187,30 +1148,13 @@ void XdsLb::ResetBackoffLocked() {
if (pending_lb_chand_ != nullptr) { if (pending_lb_chand_ != nullptr) {
grpc_channel_reset_connect_backoff(pending_lb_chand_->channel()); grpc_channel_reset_connect_backoff(pending_lb_chand_->channel());
} }
if (child_policy_ != nullptr) { locality_map_.ResetBackoffLocked();
child_policy_->ResetBackoffLocked();
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
}
} }
void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) { channelz::ChildRefsList* child_channels) {
{ // Delegate to the child_policy_ to fill the children subchannels.
// Delegate to the child_policy_ to fill the children subchannels. locality_map_.FillChildRefsForChannelz(child_subchannels, child_channels);
// This must be done holding child_policy_mu_, since this method does not
// run in the combiner.
MutexLock lock(&child_policy_mu_);
if (child_policy_ != nullptr) {
child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
if (pending_child_policy_ != nullptr) {
pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
MutexLock lock(&lb_chand_mu_); MutexLock lock(&lb_chand_mu_);
if (lb_chand_ != nullptr) { if (lb_chand_ != nullptr) {
grpc_core::channelz::ChannelNode* channel_node = grpc_core::channelz::ChannelNode* channel_node =
@ -1314,10 +1258,11 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
// have been created from a serverlist. // have been created from a serverlist.
// TODO(vpowar): Handle the fallback_address changes when we add support for // TODO(vpowar): Handle the fallback_address changes when we add support for
// fallback in xDS. // fallback in xDS.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked(); locality_map_.UpdateLocked(locality_serverlist_, child_policy_config_.get(),
args_, this);
// If this is the initial update, start the fallback timer. // If this is the initial update, start the fallback timer.
if (is_initial_update) { if (is_initial_update) {
if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && if (lb_fallback_timeout_ms_ > 0 && locality_serverlist_.empty() &&
!fallback_timer_callback_pending_) { !fallback_timer_callback_pending_) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
@ -1341,8 +1286,8 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
xdslb_policy->fallback_timer_callback_pending_ = false; xdslb_policy->fallback_timer_callback_pending_ = false;
// If we receive a serverlist after the timer fires but before this callback // If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back. // actually runs, don't fall back.
if (xdslb_policy->serverlist_ == nullptr && !xdslb_policy->shutting_down_ && if (xdslb_policy->locality_serverlist_.empty() &&
error == GRPC_ERROR_NONE) { !xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
if (grpc_lb_xds_trace.enabled()) { if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[xdslb %p] Fallback timer fired. Not using fallback backends", "[xdslb %p] Fallback timer fired. Not using fallback backends",
@ -1352,11 +1297,70 @@ void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
} }
// void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) {
// code for interacting with the child policy for (auto iter = map_.begin(); iter != map_.end();) {
// bool found = false;
for (size_t i = 0; i < locality_list.size(); i++) {
if (!gpr_stricmp(locality_list[i]->locality_name, iter->first.get())) {
found = true;
}
}
if (!found) { // Remove entries not present in the locality list
MutexLock lock(&child_refs_mu_);
iter = map_.erase(iter);
} else
iter++;
}
}
void XdsLb::LocalityMap::UpdateLocked(
const LocalityList& locality_serverlist,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent) {
if (parent->shutting_down_) return;
for (size_t i = 0; i < locality_serverlist.size(); i++) {
UniquePtr<char> locality_name(
gpr_strdup(locality_serverlist[i]->locality_name));
auto iter = map_.find(locality_name);
if (iter == map_.end()) {
OrphanablePtr<LocalityEntry> new_entry =
MakeOrphanable<LocalityEntry>(parent->Ref());
MutexLock lock(&child_refs_mu_);
iter = map_.emplace(std::move(locality_name), std::move(new_entry)).first;
}
// Don't create new child policies if not directed to
xds_grpclb_serverlist* serverlist =
parent->locality_serverlist_[i]->serverlist;
iter->second->UpdateLocked(serverlist, child_policy_config, args);
}
PruneLocalities(locality_serverlist);
}
void grpc_core::XdsLb::LocalityMap::ShutdownLocked() {
MutexLock lock(&child_refs_mu_);
map_.clear();
}
void grpc_core::XdsLb::LocalityMap::ResetBackoffLocked() {
for (auto iter = map_.begin(); iter != map_.end(); iter++) {
iter->second->ResetBackoffLocked();
}
}
void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
MutexLock lock(&child_refs_mu_);
for (auto iter = map_.begin(); iter != map_.end(); iter++) {
iter->second->FillChildRefsForChannelz(child_subchannels, child_channels);
}
}
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() { // Locality Entry child policy methods
grpc_channel_args*
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in) {
const grpc_arg args_to_add[] = { const grpc_arg args_to_add[] = {
// A channel arg indicating if the target is a backend inferred from a // A channel arg indicating if the target is a backend inferred from a
// grpclb load balancer. // grpclb load balancer.
@ -1368,15 +1372,16 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
grpc_channel_arg_integer_create( grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
}; };
return grpc_channel_args_copy_and_add(args_, args_to_add, return grpc_channel_args_copy_and_add(args_in, args_to_add,
GPR_ARRAY_SIZE(args_to_add)); GPR_ARRAY_SIZE(args_to_add));
} }
OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked( OrphanablePtr<LoadBalancingPolicy>
XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) { const char* name, const grpc_channel_args* args) {
Helper* helper = New<Helper>(Ref()); Helper* helper = New<Helper>(this->Ref());
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner(); lb_policy_args.combiner = parent_->combiner();
lb_policy_args.args = args; lb_policy_args.args = args;
lb_policy_args.channel_control_helper = lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(helper); UniquePtr<ChannelControlHelper>(helper);
@ -1397,22 +1402,27 @@ OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateChildPolicyLocked(
// child policy. This will make the child policy progress upon activity on xDS // child policy. This will make the child policy progress upon activity on xDS
// LB, which in turn is tied to the application's call. // LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties()); parent_->interested_parties());
return lb_policy; return lb_policy;
} }
void XdsLb::CreateOrUpdateChildPolicyLocked() { void XdsLb::LocalityMap::LocalityEntry::UpdateLocked(
if (shutting_down_) return; xds_grpclb_serverlist* serverlist,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args_in) {
if (parent_->shutting_down_) return;
// This should never be invoked if we do not have serverlist_, as fallback // This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin. // mode is disabled for xDS plugin.
// TODO(juanlishen): Change this as part of implementing fallback mode. // TODO(juanlishen): Change this as part of implementing fallback mode.
GPR_ASSERT(serverlist_ != nullptr); GPR_ASSERT(serverlist != nullptr);
GPR_ASSERT(serverlist_->num_servers > 0); GPR_ASSERT(serverlist->num_servers > 0);
// Construct update args. // Construct update args.
UpdateArgs update_args; UpdateArgs update_args;
update_args.addresses = ProcessServerlist(serverlist_); update_args.addresses = ProcessServerlist(serverlist);
update_args.config = child_policy_config_; update_args.config =
update_args.args = CreateChildPolicyArgsLocked(); child_policy_config == nullptr ? nullptr : child_policy_config->Ref();
update_args.args = CreateChildPolicyArgsLocked(args_in);
// If the child policy name changes, we need to create a new child // If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store // policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child // the new child policy in pending_child_policy_. Once the new child
@ -1464,9 +1474,9 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
// when the new child transitions into state READY. // when the new child transitions into state READY.
// TODO(juanlishen): If the child policy is not configured via service config, // TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer. // use whatever algorithm is specified by the balancer.
const char* child_policy_name = child_policy_config_ == nullptr const char* child_policy_name = child_policy_config == nullptr
? "round_robin" ? "round_robin"
: child_policy_config_->name(); : child_policy_config->name();
const bool create_policy = const bool create_policy =
// case 1 // case 1
child_policy_ == nullptr || child_policy_ == nullptr ||
@ -1512,6 +1522,145 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
policy_to_update->UpdateLocked(std::move(update_args)); policy_to_update->UpdateLocked(std::move(update_args));
} }
void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() {
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
parent_->interested_parties());
if (pending_child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_child_policy_->interested_parties(),
parent_->interested_parties());
}
{
MutexLock lock(&child_policy_mu_);
child_policy_.reset();
pending_child_policy_.reset();
}
}
void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked();
}
}
void XdsLb::LocalityMap::LocalityEntry::FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
MutexLock lock(&child_policy_mu_);
child_policy_->FillChildRefsForChannelz(child_subchannels, child_channels);
if (pending_child_policy_ != nullptr) {
pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
void XdsLb::LocalityMap::LocalityEntry::Orphan() {
ShutdownLocked();
Unref();
}
//
// LocalityEntry::Helper implementation
//
bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == entry_->pending_child_policy_.get();
}
bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
GPR_ASSERT(child_ != nullptr);
return child_ == entry_->child_policy_.get();
}
Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
const grpc_channel_args& args) {
if (entry_->parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return entry_->parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateChannel(
const char* target, const grpc_channel_args& args) {
if (entry_->parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return entry_->parent_->channel_control_helper()->CreateChannel(target, args);
}
void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
grpc_connectivity_state state, grpc_error* state_error,
UniquePtr<SubchannelPicker> picker) {
if (entry_->parent_->shutting_down_) {
GRPC_ERROR_UNREF(state_error);
return;
}
// If this request is from the pending child policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingChild()) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p helper %p] pending child policy %p reports state=%s",
entry_->parent_.get(), this, entry_->pending_child_policy_.get(),
grpc_connectivity_state_name(state));
}
if (state != GRPC_CHANNEL_READY) {
GRPC_ERROR_UNREF(state_error);
return;
}
grpc_pollset_set_del_pollset_set(
entry_->child_policy_->interested_parties(),
entry_->parent_->interested_parties());
MutexLock lock(&entry_->child_policy_mu_);
entry_->child_policy_ = std::move(entry_->pending_child_policy_);
} else if (!CalledByCurrentChild()) {
// This request is from an outdated child, so ignore it.
GRPC_ERROR_UNREF(state_error);
return;
}
// TODO(juanlishen): When in fallback mode, pass the child picker
// through without wrapping it. (Or maybe use a different helper for
// the fallback policy?)
GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
RefCountedPtr<XdsLbClientStats> client_stats =
entry_->parent_->lb_chand_->lb_calld() == nullptr
? nullptr
: entry_->parent_->lb_chand_->lb_calld()->client_stats();
entry_->parent_->channel_control_helper()->UpdateState(
state, state_error,
UniquePtr<SubchannelPicker>(
New<Picker>(std::move(picker), std::move(client_stats))));
}
void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {
if (entry_->parent_->shutting_down_) return;
// If there is a pending child policy, ignore re-resolution requests
// from the current child policy (or any outdated child).
if (entry_->pending_child_policy_ != nullptr && !CalledByPendingChild()) {
return;
}
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO,
"[xdslb %p] Re-resolution requested from the internal RR policy "
"(%p).",
entry_->parent_.get(), entry_->child_policy_.get());
}
GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr);
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if (entry_->parent_->lb_chand_->lb_calld() == nullptr ||
!entry_->parent_->lb_chand_->lb_calld()->seen_initial_response()) {
entry_->parent_->channel_control_helper()->RequestReresolution();
}
}
// //
// factory // factory
// //

@ -0,0 +1,419 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_MAP_H
#define GRPC_CORE_LIB_GPRPP_MAP_H
#include <grpc/support/port_platform.h>
#include <string.h>
#include <functional>
#include <iterator>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/pair.h"
namespace grpc_core {
struct StringLess {
bool operator()(const char* a, const char* b) const {
return strcmp(a, b) < 0;
}
bool operator()(const UniquePtr<char>& k1, const UniquePtr<char>& k2) {
return strcmp(k1.get(), k2.get()) < 0;
}
};
namespace testing {
class MapTest;
}
// Alternative map implementation for grpc_core
template <class Key, class T, class Compare = std::less<Key>>
class Map {
public:
typedef Key key_type;
typedef T mapped_type;
typedef Pair<key_type, mapped_type> value_type;
typedef Compare key_compare;
class iterator;
~Map() { clear(); }
T& operator[](key_type&& key);
T& operator[](const key_type& key);
iterator find(const key_type& k);
size_t erase(const key_type& key);
// Removes the current entry and points to the next one
iterator erase(iterator iter);
size_t size() { return size_; }
template <class... Args>
Pair<iterator, bool> emplace(Args&&... args);
Pair<iterator, bool> insert(value_type&& pair) {
return emplace(std::move(pair));
}
Pair<iterator, bool> insert(const value_type& pair) { return emplace(pair); }
bool empty() const { return root_ == nullptr; }
void clear() {
auto iter = begin();
while (!empty()) {
iter = erase(iter);
}
}
iterator begin() {
Entry* curr = GetMinEntry(root_);
return iterator(this, curr);
}
iterator end() { return iterator(this, nullptr); }
private:
friend class testing::MapTest;
struct Entry {
explicit Entry(value_type&& pair) : pair(std::move(pair)) {}
value_type pair;
Entry* left = nullptr;
Entry* right = nullptr;
int32_t height = 1;
};
static int32_t EntryHeight(const Entry* e) {
return e == nullptr ? 0 : e->height;
}
static Entry* GetMinEntry(Entry* e);
Entry* InOrderSuccessor(const Entry* e) const;
static Entry* RotateLeft(Entry* e);
static Entry* RotateRight(Entry* e);
static Entry* RebalanceTreeAfterInsertion(Entry* root, const key_type& k);
static Entry* RebalanceTreeAfterDeletion(Entry* root);
// Returns a pair with the first value being an iterator pointing to the
// inserted entry and the second value being the new root of the subtree
// after a rebalance
Pair<iterator, Entry*> InsertRecursive(Entry* root, value_type&& p);
static Entry* RemoveRecursive(Entry* root, const key_type& k);
// Return 0 if lhs = rhs
// 1 if lhs > rhs
// -1 if lhs < rhs
static int CompareKeys(const Key& lhs, const Key& rhs);
Entry* root_ = nullptr;
size_t size_ = 0;
};
template <class Key, class T, class Compare>
class Map<Key, T, Compare>::iterator
: public std::iterator<std::input_iterator_tag, Pair<Key, T>, int32_t,
Pair<Key, T>*, Pair<Key, T>&> {
public:
iterator(const iterator& iter) : curr_(iter.curr_), map_(iter.map_) {}
bool operator==(const iterator& rhs) const { return (curr_ == rhs.curr_); }
bool operator!=(const iterator& rhs) const { return (curr_ != rhs.curr_); }
iterator& operator++() {
curr_ = map_->InOrderSuccessor(curr_);
return *this;
}
iterator operator++(int) {
Entry* prev = curr_;
curr_ = map_->InOrderSuccessor(curr_);
return iterator(map_, prev);
}
iterator& operator=(const iterator& other) {
if (this != &other) {
this->curr_ = other.curr_;
this->map_ = other.map_;
}
return *this;
}
// operator*()
value_type& operator*() { return curr_->pair; }
const value_type& operator*() const { return curr_->pair; }
// operator->()
value_type* operator->() { return &curr_->pair; }
value_type const* operator->() const { return &curr_->pair; }
private:
friend class Map<key_type, mapped_type, key_compare>;
using GrpcMap = typename ::grpc_core::Map<Key, T, Compare>;
iterator(GrpcMap* map, Entry* curr) : curr_(curr), map_(map) {}
Entry* curr_;
GrpcMap* map_;
};
template <class Key, class T, class Compare>
T& Map<Key, T, Compare>::operator[](key_type&& key) {
auto iter = find(key);
if (iter == end()) {
return emplace(std::move(key), T()).first->second;
}
return iter->second;
}
template <class Key, class T, class Compare>
T& Map<Key, T, Compare>::operator[](const key_type& key) {
auto iter = find(key);
if (iter == end()) {
return emplace(key, T()).first->second;
}
return iter->second;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::iterator Map<Key, T, Compare>::find(
const key_type& k) {
Entry* iter = root_;
while (iter != nullptr) {
int comp = CompareKeys(iter->pair.first, k);
if (comp == 0) {
return iterator(this, iter);
} else if (comp < 0) {
iter = iter->right;
} else {
iter = iter->left;
}
}
return end();
}
template <class Key, class T, class Compare>
template <class... Args>
typename ::grpc_core::Pair<typename Map<Key, T, Compare>::iterator, bool>
Map<Key, T, Compare>::emplace(Args&&... args) {
Pair<key_type, mapped_type> pair(std::forward<Args>(args)...);
iterator ret = find(pair.first);
bool insertion = false;
if (ret == end()) {
Pair<iterator, Entry*> p = InsertRecursive(root_, std::move(pair));
root_ = p.second;
ret = p.first;
insertion = true;
size_++;
}
return MakePair(ret, insertion);
}
template <class Key, class T, class Compare>
size_t Map<Key, T, Compare>::erase(const key_type& key) {
iterator it = find(key);
if (it == end()) return 0;
erase(it);
return 1;
}
// TODO(mhaidry): Modify erase to use the iterator location
// to create an efficient erase method
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::iterator Map<Key, T, Compare>::erase(
iterator iter) {
if (iter == end()) return iter;
key_type& del_key = iter->first;
iter++;
root_ = RemoveRecursive(root_, del_key);
size_--;
return iter;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry* Map<Key, T, Compare>::InOrderSuccessor(
const Entry* e) const {
if (e->right != nullptr) {
return GetMinEntry(e->right);
}
Entry* successor = nullptr;
Entry* iter = root_;
while (iter != nullptr) {
int comp = CompareKeys(iter->pair.first, e->pair.first);
if (comp > 0) {
successor = iter;
iter = iter->left;
} else if (comp < 0) {
iter = iter->right;
} else
break;
}
return successor;
}
// Returns a pair with the first value being an iterator pointing to the
// inserted entry and the second value being the new root of the subtree
// after a rebalance
template <class Key, class T, class Compare>
typename ::grpc_core::Pair<typename Map<Key, T, Compare>::iterator,
typename Map<Key, T, Compare>::Entry*>
Map<Key, T, Compare>::InsertRecursive(Entry* root, value_type&& p) {
if (root == nullptr) {
Entry* e = New<Entry>(std::move(p));
return MakePair(iterator(this, e), e);
}
int comp = CompareKeys(root->pair.first, p.first);
if (comp > 0) {
Pair<iterator, Entry*> ret = InsertRecursive(root->left, std::move(p));
root->left = ret.second;
ret.second = RebalanceTreeAfterInsertion(root, ret.first->first);
return ret;
} else if (comp < 0) {
Pair<iterator, Entry*> ret = InsertRecursive(root->right, std::move(p));
root->right = ret.second;
ret.second = RebalanceTreeAfterInsertion(root, ret.first->first);
return ret;
} else {
root->pair = std::move(p);
return MakePair(iterator(this, root), root);
}
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry* Map<Key, T, Compare>::GetMinEntry(
Entry* e) {
if (e != nullptr) {
while (e->left != nullptr) {
e = e->left;
}
}
return e;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry* Map<Key, T, Compare>::RotateLeft(
Entry* e) {
Entry* rightChild = e->right;
Entry* rightLeftChild = rightChild->left;
rightChild->left = e;
e->right = rightLeftChild;
e->height = 1 + GPR_MAX(EntryHeight(e->left), EntryHeight(e->right));
rightChild->height = 1 + GPR_MAX(EntryHeight(rightChild->left),
EntryHeight(rightChild->right));
return rightChild;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry* Map<Key, T, Compare>::RotateRight(
Entry* e) {
Entry* leftChild = e->left;
Entry* leftRightChild = leftChild->right;
leftChild->right = e;
e->left = leftRightChild;
e->height = 1 + GPR_MAX(EntryHeight(e->left), EntryHeight(e->right));
leftChild->height =
1 + GPR_MAX(EntryHeight(leftChild->left), EntryHeight(leftChild->right));
return leftChild;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry*
Map<Key, T, Compare>::RebalanceTreeAfterInsertion(Entry* root,
const key_type& k) {
root->height = 1 + GPR_MAX(EntryHeight(root->left), EntryHeight(root->right));
int32_t heightDifference = EntryHeight(root->left) - EntryHeight(root->right);
if (heightDifference > 1 && CompareKeys(root->left->pair.first, k) > 0) {
return RotateRight(root);
}
if (heightDifference < -1 && CompareKeys(root->right->pair.first, k) < 0) {
return RotateLeft(root);
}
if (heightDifference > 1 && CompareKeys(root->left->pair.first, k) < 0) {
root->left = RotateLeft(root->left);
return RotateRight(root);
}
if (heightDifference < -1 && CompareKeys(root->right->pair.first, k) > 0) {
root->right = RotateRight(root->right);
return RotateLeft(root);
}
return root;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry*
Map<Key, T, Compare>::RebalanceTreeAfterDeletion(Entry* root) {
root->height = 1 + GPR_MAX(EntryHeight(root->left), EntryHeight(root->right));
int32_t heightDifference = EntryHeight(root->left) - EntryHeight(root->right);
if (heightDifference > 1) {
int leftHeightDifference =
EntryHeight(root->left->left) - EntryHeight(root->left->right);
if (leftHeightDifference < 0) {
root->left = RotateLeft(root->left);
}
return RotateRight(root);
}
if (heightDifference < -1) {
int rightHeightDifference =
EntryHeight(root->right->left) - EntryHeight(root->right->right);
if (rightHeightDifference > 0) {
root->right = RotateRight(root->right);
}
return RotateLeft(root);
}
return root;
}
template <class Key, class T, class Compare>
typename Map<Key, T, Compare>::Entry* Map<Key, T, Compare>::RemoveRecursive(
Entry* root, const key_type& k) {
if (root == nullptr) return root;
int comp = CompareKeys(root->pair.first, k);
if (comp > 0) {
root->left = RemoveRecursive(root->left, k);
} else if (comp < 0) {
root->right = RemoveRecursive(root->right, k);
} else {
Entry* ret;
if (root->left == nullptr) {
ret = root->right;
Delete(root);
return ret;
} else if (root->right == nullptr) {
ret = root->left;
Delete(root);
return ret;
} else {
ret = root->right;
while (ret->left != nullptr) {
ret = ret->left;
}
root->pair.swap(ret->pair);
root->right = RemoveRecursive(root->right, ret->pair.first);
}
}
return RebalanceTreeAfterDeletion(root);
}
template <class Key, class T, class Compare>
int Map<Key, T, Compare>::CompareKeys(const key_type& lhs,
const key_type& rhs) {
key_compare compare;
bool left_comparison = compare(lhs, rhs);
bool right_comparison = compare(rhs, lhs);
// Both values are equal
if (!left_comparison && !right_comparison) {
return 0;
}
return left_comparison ? -1 : 1;
}
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_MAP_H */

@ -0,0 +1,38 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_PAIR_H
#define GRPC_CORE_LIB_GPRPP_PAIR_H
#include <grpc/support/port_platform.h>
#include <utility>
namespace grpc_core {
template <class T1, class T2>
using Pair = std::pair<T1, T2>;
template <class T1, class T2>
inline Pair<typename std::decay<T1>::type, typename std::decay<T2>::type>
MakePair(T1&& u, T2&& v) {
typedef typename std::decay<T1>::type V1;
typedef typename std::decay<T2>::type V2;
return Pair<V1, V2>(std::forward<T1>(u), std::forward<T2>(v));
}
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_PAIR_H */

@ -38,6 +38,19 @@ grpc_cc_test(
], ],
) )
grpc_cc_test(
name = "grpc_core_map_test",
srcs = ["map_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:gpr_base",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test( grpc_cc_test(
name = "memory_test", name = "memory_test",
srcs = ["memory_test.cc"], srcs = ["memory_test.cc"],

@ -0,0 +1,409 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/gprpp/map.h"
#include <gtest/gtest.h>
#include "include/grpc/support/string_util.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
class Payload {
public:
Payload() : data_(-1) {}
explicit Payload(int data) : data_(data) {}
Payload(const Payload& other) : data_(other.data_) {}
Payload& operator=(const Payload& other) {
if (this != &other) {
data_ = other.data_;
}
return *this;
}
int data() { return data_; }
private:
int data_;
};
inline UniquePtr<char> CopyString(const char* string) {
return UniquePtr<char>(gpr_strdup(string));
}
static constexpr char kKeys[][4] = {"abc", "efg", "hij", "klm", "xyz"};
class MapTest : public ::testing::Test {
public:
template <class Key, class T, class Compare>
typename ::grpc_core::Map<Key, T, Compare>::Entry* Root(
typename ::grpc_core::Map<Key, T, Compare>* map) {
return map->root_;
}
};
// Test insertion of Payload
TEST_F(MapTest, EmplaceAndFind) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], Payload(i));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(kKeys[i])->second.data());
}
}
// Test insertion of Payload Unique Ptrs
TEST_F(MapTest, EmplaceAndFindWithUniquePtrValue) {
Map<const char*, UniquePtr<Payload>, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], MakeUnique<Payload>(i));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(kKeys[i])->second->data());
}
}
// Test insertion of Unique Ptr kKeys and Payload
TEST_F(MapTest, EmplaceAndFindWithUniquePtrKey) {
Map<UniquePtr<char>, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(CopyString(kKeys[i]), Payload(i));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(CopyString(kKeys[i]))->second.data());
}
}
// Test insertion of Payload
TEST_F(MapTest, InsertAndFind) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.insert(MakePair(kKeys[i], Payload(i)));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(kKeys[i])->second.data());
}
}
// Test insertion of Payload Unique Ptrs
TEST_F(MapTest, InsertAndFindWithUniquePtrValue) {
Map<const char*, UniquePtr<Payload>, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.insert(MakePair(kKeys[i], MakeUnique<Payload>(i)));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(kKeys[i])->second->data());
}
}
// Test insertion of Unique Ptr kKeys and Payload
TEST_F(MapTest, InsertAndFindWithUniquePtrKey) {
Map<UniquePtr<char>, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.insert(MakePair(CopyString(kKeys[i]), Payload(i)));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(CopyString(kKeys[i]))->second.data());
}
}
// Test bracket operators
TEST_F(MapTest, BracketOperator) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map[kKeys[i]] = Payload(i);
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map[kKeys[i]].data());
}
}
// Test bracket operators with unique pointer to payload
TEST_F(MapTest, BracketOperatorWithUniquePtrValue) {
Map<const char*, UniquePtr<Payload>, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map[kKeys[i]] = MakeUnique<Payload>(i);
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map[kKeys[i]]->data());
}
}
// Test bracket operators with unique pointer to payload
TEST_F(MapTest, BracketOperatorWithUniquePtrKey) {
Map<UniquePtr<char>, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map[CopyString(kKeys[i])] = Payload(i);
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map[CopyString(kKeys[i])].data());
}
}
// Test removal of a single value
TEST_F(MapTest, Erase) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], Payload(i));
}
EXPECT_EQ(test_map.size(), 5UL);
EXPECT_EQ(test_map.erase(kKeys[3]), 1UL); // Remove "hij"
for (int i = 0; i < 5; i++) {
if (i == 3) { // "hij" should not be present
EXPECT_TRUE(test_map.find(kKeys[i]) == test_map.end());
} else {
EXPECT_EQ(i, test_map.find(kKeys[i])->second.data());
}
}
EXPECT_EQ(test_map.size(), 4UL);
}
// Test removal of a single value with unique ptr to payload
TEST_F(MapTest, EraseWithUniquePtrValue) {
Map<const char*, UniquePtr<Payload>, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], MakeUnique<Payload>(i));
}
EXPECT_EQ(test_map.size(), 5UL);
test_map.erase(kKeys[3]); // Remove "hij"
for (int i = 0; i < 5; i++) {
if (i == 3) { // "hij" should not be present
EXPECT_TRUE(test_map.find(kKeys[i]) == test_map.end());
} else {
EXPECT_EQ(i, test_map.find(kKeys[i])->second->data());
}
}
EXPECT_EQ(test_map.size(), 4UL);
}
// Test removal of a single value
TEST_F(MapTest, EraseWithUniquePtrKey) {
Map<UniquePtr<char>, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(CopyString(kKeys[i]), Payload(i));
}
EXPECT_EQ(test_map.size(), 5UL);
test_map.erase(CopyString(kKeys[3])); // Remove "hij"
for (int i = 0; i < 5; i++) {
if (i == 3) { // "hij" should not be present
EXPECT_TRUE(test_map.find(CopyString(kKeys[i])) == test_map.end());
} else {
EXPECT_EQ(i, test_map.find(CopyString(kKeys[i]))->second.data());
}
}
EXPECT_EQ(test_map.size(), 4UL);
}
// Test clear
TEST_F(MapTest, SizeAndClear) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], Payload(i));
}
EXPECT_EQ(test_map.size(), 5UL);
EXPECT_FALSE(test_map.empty());
test_map.clear();
EXPECT_EQ(test_map.size(), 0UL);
EXPECT_TRUE(test_map.empty());
}
// Test clear with unique ptr payload
TEST_F(MapTest, SizeAndClearWithUniquePtrValue) {
Map<const char*, UniquePtr<Payload>, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], MakeUnique<Payload>(i));
}
EXPECT_EQ(test_map.size(), 5UL);
EXPECT_FALSE(test_map.empty());
test_map.clear();
EXPECT_EQ(test_map.size(), 0UL);
EXPECT_TRUE(test_map.empty());
}
// Test clear with unique ptr char key
TEST_F(MapTest, SizeAndClearWithUniquePtrKey) {
Map<UniquePtr<char>, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(CopyString(kKeys[i]), Payload(i));
}
EXPECT_EQ(test_map.size(), 5UL);
EXPECT_FALSE(test_map.empty());
test_map.clear();
EXPECT_EQ(test_map.size(), 0UL);
EXPECT_TRUE(test_map.empty());
}
// Test correction of Left-Left Tree imbalance
TEST_F(MapTest, MapLL) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 2; i >= 0; i--) {
test_map.emplace(kKeys[i], Payload(i));
}
EXPECT_EQ(strcmp(Root(&test_map)->pair.first, kKeys[1]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->pair.first, kKeys[0]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->pair.first, kKeys[2]), 0);
}
// Test correction of Left-Right tree imbalance
TEST_F(MapTest, MapLR) {
Map<const char*, Payload, StringLess> test_map;
int insertion_key_index[] = {2, 0, 1};
for (int i = 0; i < 3; i++) {
int key_index = insertion_key_index[i];
test_map.emplace(kKeys[key_index], Payload(key_index));
}
EXPECT_EQ(strcmp(Root(&test_map)->pair.first, kKeys[1]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->pair.first, kKeys[0]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->pair.first, kKeys[2]), 0);
}
// Test correction of Right-Left tree imbalance
TEST_F(MapTest, MapRL) {
Map<const char*, Payload, StringLess> test_map;
int insertion_key_index[] = {0, 2, 1};
for (int i = 0; i < 3; i++) {
int key_index = insertion_key_index[i];
test_map.emplace(kKeys[key_index], Payload(key_index));
}
EXPECT_EQ(strcmp(Root(&test_map)->pair.first, kKeys[1]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->pair.first, kKeys[0]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->pair.first, kKeys[2]), 0);
}
// Test correction of Right-Right tree imbalance
TEST_F(MapTest, MapRR) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], Payload(i));
}
EXPECT_EQ(strcmp(Root(&test_map)->pair.first, kKeys[1]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->pair.first, kKeys[0]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->pair.first, kKeys[3]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->left->pair.first, kKeys[2]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->right->pair.first, kKeys[4]), 0);
}
// Test correction after random insertion
TEST_F(MapTest, MapRandomInsertions) {
Map<const char*, Payload, StringLess> test_map;
int insertion_key_index[] = {1, 4, 3, 0, 2};
for (int i = 0; i < 5; i++) {
int key_index = insertion_key_index[i];
test_map.emplace(kKeys[key_index], Payload(key_index));
}
EXPECT_EQ(strcmp(Root(&test_map)->pair.first, kKeys[3]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->pair.first, kKeys[1]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->right->pair.first, kKeys[4]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->right->pair.first, kKeys[2]), 0);
EXPECT_EQ(strcmp(Root(&test_map)->left->left->pair.first, kKeys[0]), 0);
}
// Test Map iterator
TEST_F(MapTest, Iteration) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], Payload(i));
}
int count = 0;
for (auto iter = test_map.begin(); iter != test_map.end(); iter++) {
EXPECT_EQ(iter->second.data(), count);
count++;
}
EXPECT_EQ(count, 5);
}
// Test Map iterator with unique ptr payload
TEST_F(MapTest, IterationWithUniquePtrValue) {
Map<const char*, UniquePtr<Payload>, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], MakeUnique<Payload>(i));
}
int count = 0;
for (auto iter = test_map.begin(); iter != test_map.end(); iter++) {
EXPECT_EQ(iter->second->data(), count);
count++;
}
EXPECT_EQ(count, 5);
}
// Test Map iterator with unique ptr to char key
TEST_F(MapTest, IterationWithUniquePtrKey) {
Map<UniquePtr<char>, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(CopyString(kKeys[i]), Payload(i));
}
int count = 0;
for (auto iter = test_map.begin(); iter != test_map.end(); iter++) {
EXPECT_EQ(iter->second.data(), count);
count++;
}
EXPECT_EQ(count, 5);
}
// Test removing entries while iterating the map
TEST_F(MapTest, EraseUsingIterator) {
Map<const char*, Payload, StringLess> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(kKeys[i], Payload(i));
}
int count = 0;
for (auto iter = test_map.begin(); iter != test_map.end();) {
EXPECT_EQ(iter->second.data(), count);
iter = test_map.erase(iter);
count++;
}
EXPECT_EQ(count, 5);
EXPECT_TRUE(test_map.empty());
}
// Random ops on a Map with Integer key of Payload value,
// tests default comparator
TEST_F(MapTest, RandomOpsWithIntKey) {
Map<int, Payload> test_map;
for (int i = 0; i < 5; i++) {
test_map.emplace(i, Payload(i));
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i, test_map.find(i)->second.data());
}
for (int i = 0; i < 5; i++) {
test_map[i] = Payload(i + 10);
}
for (int i = 0; i < 5; i++) {
EXPECT_EQ(i + 10, test_map[i].data());
}
EXPECT_EQ(test_map.erase(3), 1UL);
EXPECT_TRUE(test_map.find(3) == test_map.end());
EXPECT_FALSE(test_map.empty());
EXPECT_EQ(test_map.size(), 4UL);
test_map.clear();
EXPECT_EQ(test_map.size(), 0UL);
EXPECT_TRUE(test_map.empty());
}
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -76,17 +76,17 @@ grpc_cc_library(
"tracer_util.h", "tracer_util.h",
"trickle_endpoint.h", "trickle_endpoint.h",
], ],
data = [
"lsan_suppressions.txt",
"tsan_suppressions.txt",
"ubsan_suppressions.txt",
],
language = "C++", language = "C++",
deps = [ deps = [
":grpc_debugger_macros", ":grpc_debugger_macros",
"//:gpr", "//:gpr",
"//:grpc_common", "//:grpc_common",
], ],
data = [
"lsan_suppressions.txt",
"tsan_suppressions.txt",
"ubsan_suppressions.txt",
],
) )
grpc_cc_library( grpc_cc_library(

@ -1075,10 +1075,12 @@ src/core/lib/gprpp/debug_location.h \
src/core/lib/gprpp/fork.h \ src/core/lib/gprpp/fork.h \
src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \ src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \ src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \
src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \ src/core/lib/gprpp/ref_counted_ptr.h \
src/core/lib/gprpp/thd.h \ src/core/lib/gprpp/thd.h \

@ -1164,10 +1164,12 @@ src/core/lib/gprpp/fork.cc \
src/core/lib/gprpp/fork.h \ src/core/lib/gprpp/fork.h \
src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \ src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \ src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \
src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \ src/core/lib/gprpp/ref_counted_ptr.h \
src/core/lib/gprpp/thd.h \ src/core/lib/gprpp/thd.h \

@ -3715,6 +3715,27 @@
"third_party": false, "third_party": false,
"type": "target" "type": "target"
}, },
{
"deps": [
"gpr",
"grpc",
"grpc++",
"grpc++_test",
"grpc_test_util"
],
"headers": [
"test/core/gprpp/map_tester.h"
],
"is_filegroup": false,
"language": "c++",
"name": "grpc_core_map_test",
"src": [
"test/core/gprpp/map_test.cc",
"test/core/gprpp/map_tester.h"
],
"third_party": false,
"type": "target"
},
{ {
"deps": [ "deps": [
"grpc_plugin_support" "grpc_plugin_support"
@ -7988,8 +8009,10 @@
"src/core/lib/gprpp/atomic.h", "src/core/lib/gprpp/atomic.h",
"src/core/lib/gprpp/fork.h", "src/core/lib/gprpp/fork.h",
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h" "src/core/lib/profiling/timers.h"
], ],
@ -8034,8 +8057,10 @@
"src/core/lib/gprpp/atomic.h", "src/core/lib/gprpp/atomic.h",
"src/core/lib/gprpp/fork.h", "src/core/lib/gprpp/fork.h",
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h" "src/core/lib/profiling/timers.h"
], ],

@ -4549,6 +4549,30 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "grpc_core_map_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save