pull/34915/head
Craig Tiller 1 year ago
commit 8e4f033317
  1. 15
      .github/workflows/pr-auto-tag.yaml
  2. 9
      BUILD
  3. 44
      CMakeLists.txt
  4. 46
      Makefile
  5. 3
      OWNERS
  6. 2
      _metadata.py
  7. 2
      bazel/googleapis.BUILD
  8. 18
      build_autogenerated.yaml
  9. 2
      build_config.rb
  10. 6
      build_handwritten.yaml
  11. 2
      config.m4
  12. 3
      doc/g_stands_for.md
  13. 2
      gRPC-C++.podspec
  14. 2
      gRPC-Core.podspec
  15. 2
      gRPC-ProtoRPC.podspec
  16. 2
      gRPC-RxLibrary.podspec
  17. 2
      gRPC.podspec
  18. 2
      include/grpc/impl/channel_arg_names.h
  19. 4
      include/grpcpp/version_info.h
  20. 6
      package.xml
  21. 29
      src/core/BUILD
  22. 7
      src/core/ext/filters/client_channel/client_channel.cc
  23. 59
      src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
  24. 5
      src/core/ext/filters/client_channel/lb_policy/address_filtering.h
  25. 13
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
  26. 8
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
  27. 195
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  28. 5
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  29. 107
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  30. 3
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  31. 18
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  32. 27
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  33. 29
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  34. 20
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  35. 44
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
  36. 15
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  37. 16
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  38. 4
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  39. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  40. 71
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  41. 83
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  42. 9
      src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
  43. 30
      src/core/lib/event_engine/posix_engine/timer_manager.cc
  44. 3
      src/core/lib/event_engine/posix_engine/timer_manager.h
  45. 13
      src/core/lib/gprpp/ref_counted_string.h
  46. 2
      src/core/lib/load_balancing/lb_policy.h
  47. 19
      src/core/lib/promise/activity.cc
  48. 9
      src/core/lib/promise/activity.h
  49. 98
      src/core/lib/promise/inter_activity_latch.h
  50. 7
      src/core/lib/promise/latch.h
  51. 2
      src/core/lib/promise/party.cc
  52. 103
      src/core/lib/promise/party.h
  53. 6
      src/core/lib/promise/wait_set.h
  54. 1
      src/core/lib/resolver/endpoint_addresses.cc
  55. 48
      src/core/lib/resolver/endpoint_addresses.h
  56. 4
      src/core/lib/surface/version.cc
  57. 2
      src/csharp/build/dependencies.props
  58. 2
      src/objective-c/!ProtoCompiler-gRPCCppPlugin.podspec
  59. 2
      src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
  60. 2
      src/objective-c/GRPCClient/version.h
  61. 2
      src/objective-c/RxLibrary/GRXConcurrentWriteable.m
  62. 2
      src/objective-c/RxLibrary/GRXMappingWriter.h
  63. 2
      src/objective-c/tests/Common/GRPCBlockCallbackResponseHandler.m
  64. 4
      src/objective-c/tests/version.h
  65. 2
      src/php/composer.json
  66. 2
      src/php/ext/grpc/version.h
  67. 2
      src/proto/grpc/testing/BUILD
  68. 2
      src/proto/grpc/testing/xds/v3/BUILD
  69. 2
      src/python/grpcio/grpc/_grpcio_metadata.py
  70. 2
      src/python/grpcio/grpc_version.py
  71. 2
      src/python/grpcio_admin/grpc_version.py
  72. 2
      src/python/grpcio_channelz/grpc_version.py
  73. 2
      src/python/grpcio_csds/grpc_version.py
  74. 2
      src/python/grpcio_health_checking/grpc_version.py
  75. 2
      src/python/grpcio_reflection/grpc_version.py
  76. 2
      src/python/grpcio_status/grpc_version.py
  77. 2
      src/python/grpcio_testing/grpc_version.py
  78. 2
      src/python/grpcio_tests/grpc_version.py
  79. 2
      src/ruby/lib/grpc/version.rb
  80. 2
      src/ruby/nativedebug/version.rb
  81. 2
      src/ruby/tools/version.rb
  82. 3
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  83. 6
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  84. 22
      test/core/promise/BUILD
  85. 103
      test/core/promise/inter_activity_latch_test.cc
  86. 26
      test/core/promise/party_test.cc
  87. 6
      test/core/util/test_lb_policies.cc
  88. 125
      test/cpp/end2end/xds/xds_end2end_test.cc
  89. 2
      tools/distrib/python/grpc_version.py
  90. 2
      tools/distrib/python/grpcio_tools/grpc_version.py
  91. 2
      tools/distrib/python/xds_protos/grpc_version.py
  92. 2
      tools/doxygen/Doxyfile.c++
  93. 2
      tools/doxygen/Doxyfile.c++.internal
  94. 2
      tools/doxygen/Doxyfile.core
  95. 2
      tools/doxygen/Doxyfile.core.internal
  96. 2
      tools/doxygen/Doxyfile.objc
  97. 2
      tools/doxygen/Doxyfile.objc.internal
  98. 2
      tools/doxygen/Doxyfile.php
  99. 2
      tools/internal_ci/windows/pull_request/grpc_distribtests_cpp_dll.cfg
  100. 24
      tools/run_tests/generated/tests.json

@ -1,4 +1,4 @@
name: PR Title Check & Tag
name: PR Auto Tag
on:
pull_request_target:
types: [opened, reopened, synchronize, edited]
@ -17,16 +17,3 @@ jobs:
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
sync-labels: ""
title-check:
permissions:
contents: read
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: thehanimo/pr-title-checker@v1.3.5
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
pass_on_octokit_error: false
configuration_path: ".github/pr_title_checker_config.json"

@ -30,8 +30,8 @@ licenses(["reciprocal"])
package(
default_visibility = ["//visibility:public"],
features = [
"layering_check",
"-parse_headers",
"layering_check",
],
)
@ -211,11 +211,11 @@ config_setting(
python_config_settings()
# This should be updated along with build_handwritten.yaml
g_stands_for = "gjallarhorn" # @unused
g_stands_for = "grand" # @unused
core_version = "36.0.0" # @unused
core_version = "37.0.0" # @unused
version = "1.60.0-dev" # @unused
version = "1.61.0-dev" # @unused
GPR_PUBLIC_HDRS = [
"include/grpc/support/alloc.h",
@ -2941,6 +2941,7 @@ grpc_cc_library(
"//src/core:lib/resolver/endpoint_addresses.h",
],
external_deps = [
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",

44
CMakeLists.txt generated

@ -25,11 +25,11 @@
cmake_minimum_required(VERSION 3.8)
set(PACKAGE_NAME "grpc")
set(PACKAGE_VERSION "1.60.0-dev")
set(gRPC_CORE_VERSION "36.0.0")
set(gRPC_CORE_SOVERSION "36")
set(gRPC_CPP_VERSION "1.60.0-dev")
set(gRPC_CPP_SOVERSION "1.60")
set(PACKAGE_VERSION "1.61.0-dev")
set(gRPC_CORE_VERSION "37.0.0")
set(gRPC_CORE_SOVERSION "37")
set(gRPC_CPP_VERSION "1.61.0-dev")
set(gRPC_CPP_SOVERSION "1.61")
set(PACKAGE_STRING "${PACKAGE_NAME} ${PACKAGE_VERSION}")
set(PACKAGE_TARNAME "${PACKAGE_NAME}-${PACKAGE_VERSION}")
set(PACKAGE_BUGREPORT "https://github.com/grpc/grpc/issues/")
@ -1109,6 +1109,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx init_test)
add_dependencies(buildtests_cxx initial_settings_frame_bad_client_test)
add_dependencies(buildtests_cxx insecure_security_connector_test)
add_dependencies(buildtests_cxx inter_activity_latch_test)
add_dependencies(buildtests_cxx inter_activity_pipe_test)
add_dependencies(buildtests_cxx interceptor_list_test)
add_dependencies(buildtests_cxx interop_client)
@ -14936,6 +14937,39 @@ target_link_libraries(insecure_security_connector_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(inter_activity_latch_test
test/core/promise/inter_activity_latch_test.cc
)
target_compile_features(inter_activity_latch_test PUBLIC cxx_std_14)
target_include_directories(inter_activity_latch_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(inter_activity_latch_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc
)
endif()
if(gRPC_BUILD_TESTS)

46
Makefile generated

@ -410,8 +410,8 @@ E = @echo
Q = @
endif
CORE_VERSION = 36.0.0
CPP_VERSION = 1.60.0-dev
CORE_VERSION = 37.0.0
CPP_VERSION = 1.61.0-dev
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)
@ -447,7 +447,7 @@ SHARED_EXT_CORE = dll
SHARED_EXT_CPP = dll
SHARED_PREFIX =
SHARED_VERSION_CORE = -36
SHARED_VERSION_CORE = -37
SHARED_VERSION_CPP = -1
else ifeq ($(SYSTEM),Darwin)
EXECUTABLE_SUFFIX =
@ -814,8 +814,8 @@ $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE):
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)address_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBADDRESS_SORTING_OBJS) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libaddress_sorting.so.36 -o $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBADDRESS_SORTING_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)address_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libaddress_sorting.so.37 -o $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBADDRESS_SORTING_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)address_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)address_sorting$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libaddress_sorting$(SHARED_VERSION_CORE).so
endif
endif
@ -938,8 +938,8 @@ $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGPR_OB
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(GRPC_ABSEIL_MERGE_LIBS) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.36 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(GRPC_ABSEIL_MERGE_LIBS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgpr.so.37 -o $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGPR_OBJS) $(GRPC_ABSEIL_MERGE_LIBS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)gpr$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgpr$(SHARED_VERSION_CORE).so
endif
endif
@ -1848,8 +1848,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBGRPC_
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(GRPC_ABSEIL_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libcares.a $(ZLIB_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libre2.a $(LIBDIR)/$(CONFIG)/libupb.a $(LIBDIR)/$(CONFIG)/libupb_textformat_lib.a $(LIBDIR)/$(CONFIG)/libupb_json_lib.a $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LIBDIR)/$(CONFIG)/libupb_collections_lib.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.36 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(GRPC_ABSEIL_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libcares.a $(ZLIB_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libre2.a $(LIBDIR)/$(CONFIG)/libupb.a $(LIBDIR)/$(CONFIG)/libupb_textformat_lib.a $(LIBDIR)/$(CONFIG)/libupb_json_lib.a $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LIBDIR)/$(CONFIG)/libupb_collections_lib.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc.so.37 -o $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_OBJS) $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(GRPC_ABSEIL_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libcares.a $(ZLIB_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libre2.a $(LIBDIR)/$(CONFIG)/libupb.a $(LIBDIR)/$(CONFIG)/libupb_textformat_lib.a $(LIBDIR)/$(CONFIG)/libupb_json_lib.a $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LIBDIR)/$(CONFIG)/libupb_collections_lib.a $(OPENSSL_MERGE_LIBS) $(LDLIBS_SECURE) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)grpc$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc$(SHARED_VERSION_CORE).so
endif
endif
@ -2364,8 +2364,8 @@ $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(GRPC_ABSEIL_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libcares.a $(ZLIB_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LIBDIR)/$(CONFIG)/libupb.a $(LIBDIR)/$(CONFIG)/libupb_collections_lib.a $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.36 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(GRPC_ABSEIL_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libcares.a $(ZLIB_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LIBDIR)/$(CONFIG)/libupb.a $(LIBDIR)/$(CONFIG)/libupb_collections_lib.a $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libgrpc_unsecure.so.37 -o $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBGRPC_UNSECURE_OBJS) $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libgpr.a $(GRPC_ABSEIL_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libcares.a $(ZLIB_MERGE_LIBS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LIBDIR)/$(CONFIG)/libupb.a $(LIBDIR)/$(CONFIG)/libupb_collections_lib.a $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)grpc_unsecure$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libgrpc_unsecure$(SHARED_VERSION_CORE).so
endif
endif
@ -2431,8 +2431,8 @@ $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBRE2_OB
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)re2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBRE2_OBJS) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libre2.so.36 -o $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBRE2_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)re2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libre2.so.37 -o $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBRE2_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)re2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)re2$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libre2$(SHARED_VERSION_CORE).so
endif
endif
@ -2479,8 +2479,8 @@ $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(LIBUPB_OB
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)upb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_OBJS) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb.so.36 -o $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb.so.37 -o $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)upb$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb$(SHARED_VERSION_CORE).so
endif
endif
@ -2535,8 +2535,8 @@ $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CO
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)upb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_COLLECTIONS_LIB_OBJS) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb_collections_lib.so.36 -o $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_COLLECTIONS_LIB_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb_collections_lib.so.37 -o $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_COLLECTIONS_LIB_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)upb_collections_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_collections_lib$(SHARED_VERSION_CORE).so
endif
endif
@ -2626,8 +2626,8 @@ $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE): $(
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)upb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_JSON_LIB_OBJS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb_json_lib.so.36 -o $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_JSON_LIB_OBJS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb_json_lib.so.37 -o $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_JSON_LIB_OBJS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)upb_json_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_json_lib$(SHARED_VERSION_CORE).so
endif
endif
@ -2716,8 +2716,8 @@ $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_COR
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)upb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_TEXTFORMAT_LIB_OBJS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb_textformat_lib.so.36 -o $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_TEXTFORMAT_LIB_OBJS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libupb_textformat_lib.so.37 -o $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUPB_TEXTFORMAT_LIB_OBJS) $(LIBDIR)/$(CONFIG)/libutf8_range_lib.a $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)upb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)upb_textformat_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libupb_textformat_lib$(SHARED_VERSION_CORE).so
endif
endif
@ -2764,8 +2764,8 @@ $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE):
ifeq ($(SYSTEM),Darwin)
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -install_name $(SHARED_PREFIX)utf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) -dynamiclib -o $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUTF8_RANGE_LIB_OBJS) $(LDLIBS)
else
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libutf8_range_lib.so.36 -o $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUTF8_RANGE_LIB_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)utf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).so.36
$(Q) $(LDXX) $(LDFLAGS) -L$(LIBDIR)/$(CONFIG) -shared -Wl,-soname,libutf8_range_lib.so.37 -o $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBUTF8_RANGE_LIB_OBJS) $(LDLIBS)
$(Q) ln -sf $(SHARED_PREFIX)utf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).so.37
$(Q) ln -sf $(SHARED_PREFIX)utf8_range_lib$(SHARED_VERSION_CORE).$(SHARED_EXT_CORE) $(LIBDIR)/$(CONFIG)/libutf8_range_lib$(SHARED_VERSION_CORE).so
endif
endif

@ -1,3 +0,0 @@
# Top level ownership
@markdroth **/OWNERS
@a11r **/OWNERS

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/_metadata.py.template`!!!
__version__ = """1.60.0.dev0"""
__version__ = """1.61.0.dev0"""

@ -15,7 +15,7 @@
licenses(["notice"])
package(
default_visibility = ["//visibility:public"]
default_visibility = ["//visibility:public"],
)
# This is needed for the dependency on google_cloud_cpp to work.

@ -10835,6 +10835,20 @@ targets:
deps:
- gtest
- grpc_test_util
- name: inter_activity_latch_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_latch.h
- src/core/lib/promise/wait_set.h
src:
- test/core/promise/inter_activity_latch_test.cc
deps:
- gtest
- grpc
uses_polling: false
- name: inter_activity_pipe_test
gtest: true
build: test
@ -12434,7 +12448,9 @@ targets:
gtest: true
build: test
language: c++
headers: []
headers:
- src/core/lib/promise/inter_activity_latch.h
- src/core/lib/promise/wait_set.h
src:
- test/core/promise/party_test.cc
deps:

@ -13,5 +13,5 @@
# limitations under the License.
module GrpcBuildConfig
CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-36.dll'
CORE_WINDOWS_DLL = '/tmp/libs/opt/grpc-37.dll'
end

@ -12,11 +12,11 @@ settings:
'#08': Use "-preN" suffixes to identify pre-release versions
'#09': Per-language overrides are possible with (eg) ruby_version tag here
'#10': See the expand_version.py for all the quirks here
core_version: 36.0.0
core_version: 37.0.0
csharp_major_version: 2
g_stands_for: gjallarhorn
g_stands_for: grand
protobuf_version: 3.25.0
version: 1.60.0-dev
version: 1.61.0-dev
configs:
asan:
CC: clang

2
config.m4 generated

@ -1348,7 +1348,7 @@ if test "$PHP_GRPC" != "no"; then
-D_HAS_EXCEPTIONS=0 -DNOMINMAX -DGRPC_ARES=0 \
-DGRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1 \
-DGRPC_XDS_USER_AGENT_NAME_SUFFIX='"\"PHP\""' \
-DGRPC_XDS_USER_AGENT_VERSION_SUFFIX='"\"1.60.0dev\""')
-DGRPC_XDS_USER_AGENT_VERSION_SUFFIX='"\"1.61.0dev\""')
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/backend_metrics)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/census)

@ -59,4 +59,5 @@
- 1.57 'g' stands for ['grounded'](https://github.com/grpc/grpc/tree/v1.57.x)
- 1.58 'g' stands for ['goku'](https://github.com/grpc/grpc/tree/v1.58.x)
- 1.59 'g' stands for ['generative'](https://github.com/grpc/grpc/tree/v1.59.x)
- 1.60 'g' stands for ['gjallarhorn'](https://github.com/grpc/grpc/tree/master)
- 1.60 'g' stands for ['gjallarhorn'](https://github.com/grpc/grpc/tree/v1.60.x)
- 1.61 'g' stands for ['grand'](https://github.com/grpc/grpc/tree/master)

2
gRPC-C++.podspec generated

@ -22,7 +22,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-C++'
# TODO (mxyan): use version that match gRPC version when pod is stabilized
version = '1.60.0-dev'
version = '1.61.0-dev'
s.version = version
s.summary = 'gRPC C++ library'
s.homepage = 'https://grpc.io'

2
gRPC-Core.podspec generated

@ -21,7 +21,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-Core'
version = '1.60.0-dev'
version = '1.61.0-dev'
s.version = version
s.summary = 'Core cross-platform gRPC library, written in C'
s.homepage = 'https://grpc.io'

@ -21,7 +21,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-ProtoRPC'
version = '1.60.0-dev'
version = '1.61.0-dev'
s.version = version
s.summary = 'RPC library for Protocol Buffers, based on gRPC'
s.homepage = 'https://grpc.io'

@ -21,7 +21,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC-RxLibrary'
version = '1.60.0-dev'
version = '1.61.0-dev'
s.version = version
s.summary = 'Reactive Extensions library for iOS/OSX.'
s.homepage = 'https://grpc.io'

2
gRPC.podspec generated

@ -20,7 +20,7 @@
Pod::Spec.new do |s|
s.name = 'gRPC'
version = '1.60.0-dev'
version = '1.61.0-dev'
s.version = version
s.summary = 'gRPC client library for iOS/OSX'
s.homepage = 'https://grpc.io'

@ -15,7 +15,7 @@
#ifndef GRPC_IMPL_CHANNEL_ARG_NAMES_H
#define GRPC_IMPL_CHANNEL_ARG_NAMES_H
// IWYU pragma: private, include "third_party/grpc/include/grpc/grpc.h"
// IWYU pragma: private, include <grpc/grpc.h>
// IWYU pragma: friend "src/.*"
// IWYU pragma: friend "test/.*"

@ -19,9 +19,9 @@
#define GRPCPP_VERSION_INFO_H
#define GRPC_CPP_VERSION_MAJOR 1
#define GRPC_CPP_VERSION_MINOR 60
#define GRPC_CPP_VERSION_MINOR 61
#define GRPC_CPP_VERSION_PATCH 0
#define GRPC_CPP_VERSION_TAG "dev"
#define GRPC_CPP_VERSION_STRING "1.60.0-dev"
#define GRPC_CPP_VERSION_STRING "1.61.0-dev"
#endif // GRPCPP_VERSION_INFO_H

6
package.xml generated

@ -13,8 +13,8 @@
<date>2019-09-24</date>
<time>16:06:07</time>
<version>
<release>1.60.0dev</release>
<api>1.60.0dev</api>
<release>1.61.0dev</release>
<api>1.61.0dev</api>
</version>
<stability>
<release>beta</release>
@ -22,7 +22,7 @@
</stability>
<license>Apache 2.0</license>
<notes>
- gRPC Core 1.60.0 update
- gRPC Core 1.61.0 update
</notes>
<contents>
<dir baseinstalldir="/" name="/">

@ -478,6 +478,7 @@ grpc_cc_library(
"arena",
"construct_destruct",
"context",
"poll",
"promise_factory",
"promise_trace",
"ref_counted",
@ -874,6 +875,25 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "inter_activity_latch",
external_deps = [
"absl/base:core_headers",
"absl/strings",
],
language = "c++",
public_hdrs = [
"lib/promise/inter_activity_latch.h",
],
deps = [
"activity",
"poll",
"promise_trace",
"wait_set",
"//:gpr",
],
)
grpc_cc_library(
name = "interceptor_list",
hdrs = [
@ -4120,6 +4140,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -4647,6 +4668,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc",
],
external_deps = [
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -4669,6 +4691,7 @@ grpc_cc_library(
"no_destruct",
"pollset_set",
"ref_counted_string",
"resolved_address",
"validation_errors",
"//:channel_arg_names",
"//:config",
@ -4817,6 +4840,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/address_filtering.h",
],
external_deps = [
"absl/functional:function_ref",
"absl/status:statusor",
"absl/strings",
],
@ -4825,6 +4849,7 @@ grpc_cc_library(
"channel_args",
"ref_counted",
"ref_counted_string",
"resolved_address",
"//:endpoint_addresses",
"//:gpr_platform",
"//:ref_counted_ptr",
@ -4894,6 +4919,7 @@ grpc_cc_library(
"lb_policy",
"subchannel_interface",
"//:debug_location",
"//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:ref_counted_ptr",
@ -4911,7 +4937,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/endpoint_list.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/types:optional",
@ -5305,6 +5331,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",

@ -1599,7 +1599,12 @@ absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
Resolver::Result result) {
// Construct update.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
if (!result.addresses.ok()) {
update_args.addresses = result.addresses.status();
} else {
update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
std::move(*result.addresses));
}
update_args.config = std::move(lb_policy_config);
update_args.resolution_note = std::move(result.resolution_note);
// Remove the config selector from channel args so that we're not holding

@ -20,11 +20,13 @@
#include <stddef.h>
#include <algorithm>
#include <utility>
#include "absl/functional/function_ref.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_core {
@ -43,20 +45,26 @@ int HierarchicalPathArg::ChannelArgsCompare(const HierarchicalPathArg* a,
return 0;
}
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
const absl::StatusOr<EndpointAddressesList>& addresses) {
if (!addresses.ok()) return addresses.status();
HierarchicalAddressMap result;
namespace {
class HierarchicalAddressIterator : public EndpointAddressesIterator {
public:
HierarchicalAddressIterator(
std::shared_ptr<EndpointAddressesIterator> parent_it,
RefCountedStringValue child_name)
: parent_it_(std::move(parent_it)), child_name_(std::move(child_name)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
RefCountedPtr<HierarchicalPathArg> remaining_path_attr;
for (const EndpointAddresses& endpoint_addresses : *addresses) {
const auto* path_arg =
endpoint_addresses.args().GetObject<HierarchicalPathArg>();
if (path_arg == nullptr) continue;
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
const auto* path_arg = endpoint.args().GetObject<HierarchicalPathArg>();
if (path_arg == nullptr) return;
const std::vector<RefCountedStringValue>& path = path_arg->path();
auto it = path.begin();
if (it == path.end()) continue;
EndpointAddressesList& target_list = result[*it];
ChannelArgs args = endpoint_addresses.args();
if (it == path.end()) return;
if (*it != child_name_) return;
ChannelArgs args = endpoint.args();
++it;
if (it != path.end()) {
std::vector<RefCountedStringValue> remaining_path(it, path.end());
@ -67,8 +75,33 @@ absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
}
args = args.SetObject(remaining_path_attr);
}
target_list.emplace_back(endpoint_addresses.addresses(), args);
callback(EndpointAddresses(endpoint.addresses(), args));
});
}
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
RefCountedStringValue child_name_;
};
} // namespace
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses) {
if (!addresses.ok()) return addresses.status();
HierarchicalAddressMap result;
(*addresses)->ForEach([&](const EndpointAddresses& endpoint) {
const auto* path_arg = endpoint.args().GetObject<HierarchicalPathArg>();
if (path_arg == nullptr) return;
const std::vector<RefCountedStringValue>& path = path_arg->path();
auto it = path.begin();
if (it == path.end()) return;
auto& target_list = result[*it];
if (target_list == nullptr) {
target_list =
std::make_shared<HierarchicalAddressIterator>(*addresses, *it);
}
});
return result;
}

@ -20,6 +20,7 @@
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include <utility>
#include <vector>
@ -105,12 +106,12 @@ class HierarchicalPathArg : public RefCounted<HierarchicalPathArg> {
// A map from the next path element to the endpoint addresses that fall
// under that path element.
using HierarchicalAddressMap =
std::map<RefCountedStringValue, EndpointAddressesList,
std::map<RefCountedStringValue, std::shared_ptr<EndpointAddressesIterator>,
RefCountedStringValueLessThan>;
// Splits up the addresses into a separate list for each child.
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
const absl::StatusOr<EndpointAddressesList>& addresses);
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses);
} // namespace grpc_core

@ -118,7 +118,7 @@ void EndpointList::Endpoint::Init(
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses.emplace().emplace_back(addresses);
update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
update_args.args = child_args;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
@ -163,15 +163,16 @@ RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
//
void EndpointList::Init(
const EndpointAddressesList& endpoints, const ChannelArgs& args,
absl::AnyInvocable<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
EndpointAddressesIterator* endpoints, const ChannelArgs& args,
absl::FunctionRef<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
const EndpointAddresses&,
const ChannelArgs&)>
create_endpoint) {
for (const EndpointAddresses& addresses : endpoints) {
if (endpoints == nullptr) return;
endpoints->ForEach([&](const EndpointAddresses& endpoint) {
endpoints_.push_back(
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), addresses, args));
}
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args));
});
}
void EndpointList::ResetBackoffLocked() {

@ -25,7 +25,7 @@
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
@ -53,7 +53,7 @@ namespace grpc_core {
class MyEndpointList : public EndpointList {
public:
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy,
const EndpointAddressesList& endpoints,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(lb_policy),
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer)
@ -184,8 +184,8 @@ class EndpointList : public InternallyRefCounted<EndpointList> {
EndpointList(RefCountedPtr<LoadBalancingPolicy> policy, const char* tracer)
: policy_(std::move(policy)), tracer_(tracer) {}
void Init(const EndpointAddressesList& endpoints, const ChannelArgs& args,
absl::AnyInvocable<OrphanablePtr<Endpoint>(
void Init(EndpointAddressesIterator* endpoints, const ChannelArgs& args,
absl::FunctionRef<OrphanablePtr<Endpoint>(
RefCountedPtr<EndpointList>, const EndpointAddresses&,
const ChannelArgs&)>
create_endpoint);

@ -72,6 +72,7 @@
#include <vector>
#include "absl/container/inlined_vector.h"
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -384,9 +385,9 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns a text representation suitable for logging.
std::string AsText() const;
// Extracts all non-drop entries into an EndpointAddressesList.
EndpointAddressesList GetServerAddressList(
GrpcLbClientStats* client_stats) const;
// Extracts all non-drop entries into an EndpointAddressesIterator.
std::shared_ptr<EndpointAddressesIterator> GetServerAddressList(
GrpcLbClientStats* client_stats);
// Returns true if the serverlist contains at least one drop entry and
// no backend address entries.
@ -400,6 +401,8 @@ class GrpcLb : public LoadBalancingPolicy {
const char* ShouldDrop();
private:
class AddressIterator;
std::vector<GrpcLbServer> serverlist_;
// Accessed from the picker, so needs synchronization.
@ -504,6 +507,8 @@ class GrpcLb : public LoadBalancingPolicy {
RefCountedPtr<GrpcLb> parent_;
};
class NullLbTokenEndpointIterator;
void ShutdownLocked() override;
// Helper functions used in UpdateLocked().
@ -569,7 +574,8 @@ class GrpcLb : public LoadBalancingPolicy {
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// The backend addresses from the resolver.
absl::StatusOr<EndpointAddressesList> fallback_backend_addresses_;
absl::StatusOr<std::shared_ptr<NullLbTokenEndpointIterator>>
fallback_backend_addresses_;
// The last resolution note from our parent.
// To be passed to child policy when fallback_backend_addresses_ is empty.
std::string resolution_note_;
@ -594,11 +600,30 @@ class GrpcLb : public LoadBalancingPolicy {
};
//
// GrpcLb::Serverlist
// GrpcLb::Serverlist::AddressIterator
//
bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
return serverlist_ == other.serverlist_;
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %" PRIuPTR
" of serverlist. Ignoring.",
server.port, idx);
}
return false;
}
if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
" of serverlist. Ignoring",
server.ip_size, idx);
}
return false;
}
return true;
}
void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
@ -623,6 +648,53 @@ void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
}
}
class GrpcLb::Serverlist::AddressIterator : public EndpointAddressesIterator {
public:
AddressIterator(RefCountedPtr<Serverlist> serverlist,
RefCountedPtr<GrpcLbClientStats> client_stats)
: serverlist_(std::move(serverlist)),
client_stats_(std::move(client_stats)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_->serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
// Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
auto addr_uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token "
"will be used instead",
addr_uri.ok() ? addr_uri->c_str()
: addr_uri.status().ToString().c_str());
}
// Return address with a channel arg containing LB token and stats object.
callback(EndpointAddresses(
addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
std::move(lb_token), client_stats_))));
}
}
private:
RefCountedPtr<Serverlist> serverlist_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
//
// GrpcLb::Serverlist
//
bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
return serverlist_ == other.serverlist_;
}
std::string GrpcLb::Serverlist::AsText() const {
std::vector<std::string> entries;
for (size_t i = 0; i < serverlist_.size(); ++i) {
@ -642,59 +714,12 @@ std::string GrpcLb::Serverlist::AsText() const {
return absl::StrJoin(entries, "");
}
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %" PRIuPTR
" of serverlist. Ignoring.",
server.port, idx);
}
return false;
}
if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
" of serverlist. Ignoring",
server.ip_size, idx);
}
return false;
}
return true;
}
// Returns addresses extracted from the serverlist.
EndpointAddressesList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
std::shared_ptr<EndpointAddressesIterator>
GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) {
RefCountedPtr<GrpcLbClientStats> stats;
if (client_stats != nullptr) stats = client_stats->Ref();
EndpointAddressesList endpoints;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
// Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
auto addr_uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
addr_uri.ok() ? addr_uri->c_str()
: addr_uri.status().ToString().c_str());
}
// Add address with a channel arg containing LB token and stats object.
endpoints.emplace_back(
addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
std::move(lb_token), stats)));
}
return endpoints;
return std::make_shared<AddressIterator>(Ref(), std::move(stats));
}
bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
@ -1503,6 +1528,31 @@ void GrpcLb::ResetBackoffLocked() {
}
}
// Endpoint iterator wrapper to add null LB token attribute.
class GrpcLb::NullLbTokenEndpointIterator : public EndpointAddressesIterator {
public:
explicit NullLbTokenEndpointIterator(
std::shared_ptr<EndpointAddressesIterator> parent_it)
: parent_it_(std::move(parent_it)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
endpoint.ToString().c_str());
}
callback(EndpointAddresses(endpoint.addresses(),
endpoint.args().SetObject(empty_token_)));
});
}
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
MakeRefCounted<TokenAndClientStatsArg>("", nullptr);
};
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] received update", this);
@ -1512,19 +1562,11 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
GPR_ASSERT(config_ != nullptr);
args_ = std::move(args.args);
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
// Add null LB token attributes.
for (EndpointAddresses& endpoint : *fallback_backend_addresses_) {
endpoint = EndpointAddresses(
endpoint.addresses(),
endpoint.args().SetObject(
MakeRefCounted<TokenAndClientStatsArg>("", nullptr)));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
endpoint.ToString().c_str());
}
}
if (!args.addresses.ok()) {
fallback_backend_addresses_ = args.addresses.status();
} else {
fallback_backend_addresses_ = std::make_shared<NullLbTokenEndpointIterator>(
std::move(*args.addresses));
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
@ -1756,6 +1798,12 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
return lb_policy;
}
bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) {
bool empty = true;
endpoints.ForEach([&](const EndpointAddresses&) { empty = false; });
return empty;
}
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
@ -1769,16 +1817,17 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
// picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
EndpointIteratorIsEmpty(**fallback_backend_addresses_)) {
update_args.resolution_note = absl::StrCat(
"grpclb in fallback mode without any balancer addresses: ",
"grpclb in fallback mode without any fallback addresses: ",
resolution_note_);
}
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
if (update_args.addresses.ok() && update_args.addresses->empty()) {
if (update_args.addresses.ok() &&
EndpointIteratorIsEmpty(**update_args.addresses)) {
update_args.resolution_note = "empty serverlist from grpclb balancer";
}
}

@ -661,7 +661,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (args.addresses.ok()) {
std::set<EndpointAddressSet> current_endpoints;
std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
for (const EndpointAddresses& endpoint : *args.addresses) {
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
EndpointAddressSet key(endpoint.addresses());
current_endpoints.emplace(key);
for (const grpc_resolved_address& address : endpoint.addresses()) {
@ -708,7 +708,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
}
it->second->DisableEjection();
}
}
});
// Remove any entries we no longer need in the subchannel map.
for (auto it = subchannel_state_map_.begin();
it != subchannel_state_map_.end();) {
@ -753,7 +753,6 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_policy();
// Update the policy.
update_args.args = std::move(args.args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,

@ -21,7 +21,6 @@
#include <inttypes.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <set>
#include <string>
@ -114,7 +113,7 @@ class PickFirst : public LoadBalancingPolicy {
public:
class SubchannelData {
public:
SubchannelData(SubchannelList* subchannel_list,
SubchannelData(SubchannelList* subchannel_list, size_t index,
RefCountedPtr<SubchannelInterface> subchannel);
SubchannelInterface* subchannel() const { return subchannel_.get(); }
@ -125,12 +124,6 @@ class PickFirst : public LoadBalancingPolicy {
return connectivity_status_;
}
// Returns the index into the subchannel list of this object.
size_t Index() const {
return static_cast<size_t>(this -
&subchannel_list_->subchannels_.front());
}
// Resets the connection backoff.
void ResetBackoffLocked() {
if (subchannel_ != nullptr) subchannel_->ResetBackoff();
@ -153,10 +146,8 @@ class PickFirst : public LoadBalancingPolicy {
class Watcher
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
Watcher(SubchannelData* subchannel_data,
RefCountedPtr<SubchannelList> subchannel_list)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)) {}
Watcher(RefCountedPtr<SubchannelList> subchannel_list, size_t index)
: subchannel_list_(std::move(subchannel_list)), index_(index) {}
~Watcher() override {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
@ -164,8 +155,8 @@ class PickFirst : public LoadBalancingPolicy {
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override {
subchannel_data_->OnConnectivityStateChange(new_state,
std::move(status));
subchannel_list_->subchannels_[index_].OnConnectivityStateChange(
new_state, std::move(status));
}
grpc_pollset_set* interested_parties() override {
@ -173,8 +164,8 @@ class PickFirst : public LoadBalancingPolicy {
}
private:
SubchannelData* subchannel_data_;
RefCountedPtr<SubchannelList> subchannel_list_;
const size_t index_;
};
// This method will be invoked once soon after instantiation to report
@ -193,6 +184,7 @@ class PickFirst : public LoadBalancingPolicy {
// Backpointer to owning subchannel list. Not owned.
SubchannelList* subchannel_list_;
const size_t index_;
// The subchannel.
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
@ -205,7 +197,8 @@ class PickFirst : public LoadBalancingPolicy {
};
SubchannelList(RefCountedPtr<PickFirst> policy,
EndpointAddressesList addresses, const ChannelArgs& args);
EndpointAddressesIterator* addresses,
const ChannelArgs& args);
~SubchannelList() override;
@ -413,9 +406,9 @@ void PickFirst::ResetBackoffLocked() {
void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Create a subchannel list from latest_update_args_.
EndpointAddressesList addresses;
EndpointAddressesIterator* addresses = nullptr;
if (latest_update_args_.addresses.ok()) {
addresses = *latest_update_args_.addresses;
addresses = latest_update_args_.addresses->get();
}
// Replace latest_pending_subchannel_list_.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
@ -425,7 +418,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
Ref(), std::move(addresses), latest_update_args_.args);
Ref(), addresses, latest_update_args_.args);
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->size() == 0) {
@ -483,9 +476,7 @@ class AddressFamilyIterator {
absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
args.addresses->size());
gpr_log(GPR_INFO, "Pick First %p received update", this);
} else {
gpr_log(GPR_INFO, "Pick First %p received update with address error: %s",
this, args.addresses.status().ToString().c_str());
@ -495,13 +486,18 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
absl::Status status;
if (!args.addresses.ok()) {
status = args.addresses.status();
} else if (args.addresses->empty()) {
} else {
EndpointAddressesList endpoints;
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
endpoints.push_back(endpoint);
});
if (endpoints.empty()) {
status = absl::UnavailableError("address list must not be empty");
} else {
// Shuffle the list if needed.
auto config = static_cast<PickFirstConfig*>(args.config.get());
if (config->shuffle_addresses()) {
absl::c_shuffle(*args.addresses, bit_gen_);
absl::c_shuffle(endpoints, bit_gen_);
}
// Flatten the list so that we have one address per endpoint.
// While we're iterating, also determine the desired address family
@ -509,19 +505,21 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
// the interleaving below.
std::set<absl::string_view> address_families;
std::vector<AddressFamilyIterator> address_family_order;
EndpointAddressesList endpoints;
for (const auto& endpoint : *args.addresses) {
EndpointAddressesList flattened_endpoints;
for (const auto& endpoint : endpoints) {
for (const auto& address : endpoint.addresses()) {
endpoints.emplace_back(address, endpoint.args());
flattened_endpoints.emplace_back(address, endpoint.args());
if (IsPickFirstHappyEyeballsEnabled()) {
absl::string_view scheme = GetAddressFamily(address);
bool inserted = address_families.insert(scheme).second;
if (inserted) {
address_family_order.emplace_back(scheme, endpoints.size() - 1);
address_family_order.emplace_back(scheme,
flattened_endpoints.size() - 1);
}
}
}
}
endpoints = std::move(flattened_endpoints);
// Interleave addresses as per RFC-8305 section 4.
if (IsPickFirstHappyEyeballsEnabled()) {
EndpointAddressesList interleaved_endpoints;
@ -537,9 +535,10 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
} while (endpoint == nullptr);
interleaved_endpoints.emplace_back(std::move(*endpoint));
}
args.addresses = std::move(interleaved_endpoints);
} else {
args.addresses = std::move(endpoints);
endpoints = std::move(interleaved_endpoints);
}
args.addresses =
std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
}
}
// If the update contains a resolver error and we have a previous update
@ -617,18 +616,20 @@ void PickFirst::HealthWatcher::OnConnectivityStateChange(
//
PickFirst::SubchannelList::SubchannelData::SubchannelData(
SubchannelList* subchannel_list,
SubchannelList* subchannel_list, size_t index,
RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {
: subchannel_list_(subchannel_list),
index_(index),
subchannel_(std::move(subchannel)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR
" (subchannel %p): starting watch",
subchannel_list_->policy_.get(), subchannel_list_,
subchannel_list_->size(), subchannel_.get());
subchannel_list_->policy_.get(), subchannel_list_, index_,
subchannel_.get());
}
auto watcher = std::make_unique<Watcher>(
this, subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
pending_watcher_ = watcher.get();
subchannel_->WatchConnectivityState(std::move(watcher));
}
@ -639,7 +640,7 @@ void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): cancelling watch and unreffing subchannel",
subchannel_list_->policy_.get(), subchannel_list_, Index(),
subchannel_list_->policy_.get(), subchannel_list_, index_,
subchannel_list_->size(), subchannel_.get());
}
subchannel_->CancelConnectivityStateWatch(pending_watcher_);
@ -659,7 +660,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
"status=%s, shutting_down=%d, pending_watcher=%p, "
"seen_transient_failure=%d, p->selected_=%p, "
"p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p",
p, subchannel_list_, Index(), subchannel_list_->size(),
p, subchannel_list_, index_, subchannel_list_->size(),
subchannel_.get(),
(connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
@ -771,7 +772,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
if (!IsPickFirstHappyEyeballsEnabled()) {
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list_->attempting_index_) return;
if (index_ != subchannel_list_->attempting_index_) return;
// React to the connectivity state.
ReactToConnectivityStateLocked();
return;
@ -784,7 +785,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
if (!prev_seen_transient_failure && seen_transient_failure_) {
// If a connection attempt fails before the timer fires, then
// cancel the timer and start connecting on the next subchannel.
if (Index() == subchannel_list_->attempting_index_) {
if (index_ == subchannel_list_->attempting_index_) {
if (subchannel_list_->timer_handle_.has_value()) {
p->channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_list_->timer_handle_);
@ -858,7 +859,7 @@ void PickFirst::SubchannelList::SubchannelData::
// We skip subchannels in state TRANSIENT_FAILURE to avoid a
// large recursion that could overflow the stack.
SubchannelData* found_subchannel = nullptr;
for (size_t next_index = Index() + 1;
for (size_t next_index = index_ + 1;
next_index < subchannel_list_->size(); ++next_index) {
SubchannelData* sc = &subchannel_list_->subchannels_[next_index];
GPR_ASSERT(sc->connectivity_state_.has_value());
@ -946,14 +947,14 @@ void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
}
// If this is not the last subchannel in the list, start the timer.
if (Index() != subchannel_list_->size() - 1) {
if (index_ != subchannel_list_->size() - 1) {
PickFirst* p = subchannel_list_->policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p: starting Connection "
"Attempt Delay timer for %" PRId64 "ms for index %" PRIuPTR,
p, subchannel_list_, p->connection_attempt_delay_.millis(),
Index());
index_);
}
subchannel_list_->timer_handle_ =
p->channel_control_helper()->GetEventEngine()->RunAfter(
@ -1041,7 +1042,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
}
// Unref all other subchannels in the list.
for (size_t i = 0; i < subchannel_list_->size(); ++i) {
if (i != Index()) {
if (i != index_) {
subchannel_list_->subchannels_[i].ShutdownLocked();
}
}
@ -1052,7 +1053,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
//
PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
EndpointAddressesList addresses,
EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: InternallyRefCounted<SubchannelList>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
@ -1062,14 +1063,12 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
.Remove(
GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] Creating subchannel list %p for %" PRIuPTR
" subchannels - channel args: %s",
policy_.get(), this, addresses.size(), args_.ToString().c_str());
gpr_log(GPR_INFO, "[PF %p] Creating subchannel list %p - channel args: %s",
policy_.get(), this, args_.ToString().c_str());
}
subchannels_.reserve(addresses.size());
if (addresses == nullptr) return;
// Create a subchannel for each address.
for (const EndpointAddresses& address : addresses) {
addresses->ForEach([&](const EndpointAddresses& address) {
GPR_ASSERT(address.addresses().size() == 1);
RefCountedPtr<SubchannelInterface> subchannel =
policy_->channel_control_helper()->CreateSubchannel(
@ -1081,7 +1080,7 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
"[PF %p] could not create subchannel for address %s, ignoring",
policy_.get(), address.ToString().c_str());
}
continue;
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
@ -1090,8 +1089,8 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
policy_.get(), this, subchannels_.size(), subchannel.get(),
address.ToString().c_str());
}
subchannels_.emplace_back(this, std::move(subchannel));
}
subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
});
}
PickFirst::SubchannelList::~SubchannelList() {

@ -684,7 +684,8 @@ absl::Status PriorityLb::ChildPriority::UpdateLocked(
if (priority_policy_->addresses_.ok()) {
auto it = priority_policy_->addresses_->find(name_);
if (it == priority_policy_->addresses_->end()) {
update_args.addresses.emplace();
update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
EndpointAddressesList());
} else {
update_args.addresses = it->second;
}

@ -554,7 +554,8 @@ void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses.emplace().emplace_back(ring_hash_->endpoints_[index_]);
update_args.addresses =
std::make_shared<SingleEndpointIterator>(ring_hash_->endpoints_[index_]);
update_args.args = ring_hash_->args_;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
@ -622,18 +623,14 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
// Check address list.
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[RH %p] received update", this);
}
// De-dup endpoints, taking weight into account.
endpoints_.clear();
endpoints_.reserve(args.addresses->size());
std::map<EndpointAddressSet, size_t> endpoint_indices;
size_t num_skipped = 0;
for (size_t i = 0; i < args.addresses->size(); ++i) {
EndpointAddresses& endpoint = (*args.addresses)[i];
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
const EndpointAddressSet key(endpoint.addresses());
auto p = endpoint_indices.emplace(key, i - num_skipped);
auto p = endpoint_indices.emplace(key, endpoints_.size());
if (!p.second) {
// Duplicate endpoint. Combine weights and skip the dup.
EndpointAddresses& prev_endpoint = endpoints_[p.first->second];
@ -651,11 +648,10 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
prev_endpoint.addresses(),
prev_endpoint.args().Set(GRPC_ARG_ADDRESS_WEIGHT,
weight_arg + prev_weight_arg));
++num_skipped;
} else {
endpoints_.push_back(std::move(endpoint));
}
endpoints_.push_back(endpoint);
}
});
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",

@ -707,7 +707,7 @@ class RlsLb : public LoadBalancingPolicy {
OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
// Accessed only from within WorkSerializer.
absl::StatusOr<EndpointAddressesList> addresses_;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses_;
ChannelArgs channel_args_;
RefCountedPtr<RlsLbConfig> config_;
RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
@ -1858,6 +1858,27 @@ RlsLb::RlsLb(Args args) : LoadBalancingPolicy(std::move(args)), cache_(this) {
}
}
bool EndpointsEqual(
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>
endpoints2) {
if (endpoints1.status() != endpoints2.status()) return false;
if (endpoints1.ok()) {
std::vector<EndpointAddresses> e1_list;
(*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) {
e1_list.push_back(endpoint);
});
size_t i = 0;
bool different = false;
(*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) {
if (endpoint != e1_list[i++]) different = true;
});
if (different) return false;
if (i != e1_list.size()) return false;
}
return true;
}
absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
@ -1875,7 +1896,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
// Swap out addresses.
// If the new address list is an error and we have an existing address list,
// stick with the existing addresses.
absl::StatusOr<EndpointAddressesList> old_addresses;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> old_addresses;
if (args.addresses.ok()) {
old_addresses = std::move(addresses_);
addresses_ = std::move(args.addresses);
@ -1888,7 +1909,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
bool update_child_policies =
old_config == nullptr ||
old_config->child_policy_config() != config_->child_policy_config() ||
old_addresses != addresses_ || args.args != channel_args_;
!EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_;
// If default target changes, swap out child policy.
bool created_default_child = false;
if (old_config == nullptr ||

@ -125,14 +125,14 @@ class OldRoundRobin : public LoadBalancingPolicy {
: public SubchannelList<RoundRobinSubchannelList,
RoundRobinSubchannelData> {
public:
RoundRobinSubchannelList(OldRoundRobin* policy, ServerAddressList addresses,
RoundRobinSubchannelList(OldRoundRobin* policy,
EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
? "RoundRobinSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
addresses, policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -277,13 +277,12 @@ void OldRoundRobin::ResetBackoffLocked() {
}
absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses;
EndpointAddressesIterator* addresses = nullptr;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[RR %p] received update", this);
}
addresses = std::move(*args.addresses);
addresses = args.addresses->get();
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
@ -299,8 +298,8 @@ absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeRefCounted<RoundRobinSubchannelList>(
this, std::move(addresses), args.args);
latest_pending_subchannel_list_ =
MakeRefCounted<RoundRobinSubchannelList>(this, addresses, args.args);
latest_pending_subchannel_list_->StartWatchingLocked(args.args);
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
@ -524,7 +523,7 @@ class RoundRobin : public LoadBalancingPolicy {
class RoundRobinEndpointList : public EndpointList {
public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
const EndpointAddressesList& endpoints,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(round_robin),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
@ -687,13 +686,12 @@ void RoundRobin::ResetBackoffLocked() {
}
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
EndpointAddressesList addresses;
EndpointAddressesIterator* addresses = nullptr;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " endpoints",
this, args.addresses->size());
gpr_log(GPR_INFO, "[RR %p] received update", this);
}
addresses = std::move(*args.addresses);
addresses = args.addresses->get();
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
@ -710,8 +708,7 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), std::move(addresses),
args.args);
Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), addresses, args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {

@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -208,7 +209,7 @@ class SubchannelList : public DualRefCounted<SubchannelListType> {
protected:
SubchannelList(LoadBalancingPolicy* policy, const char* tracer,
ServerAddressList addresses,
EndpointAddressesIterator* addresses,
LoadBalancingPolicy::ChannelControlHelper* helper,
const ChannelArgs& args);
@ -365,19 +366,18 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, const char* tracer,
ServerAddressList addresses,
EndpointAddressesIterator* addresses,
LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args)
: DualRefCounted<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer) {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
tracer_, policy, this, addresses.size());
gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p", tracer_, policy,
this);
}
subchannels_.reserve(addresses.size());
if (addresses == nullptr) return;
// Create a subchannel for each address.
for (ServerAddress address : addresses) {
addresses->ForEach([&](const EndpointAddresses& address) {
RefCountedPtr<SubchannelInterface> subchannel =
helper->CreateSubchannel(address.address(), address.args(), args);
if (subchannel == nullptr) {
@ -387,7 +387,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
"[%s %p] could not create subchannel for address %s, ignoring",
tracer_, policy_, address.ToString().c_str());
}
continue;
return;
}
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO,
@ -397,8 +397,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
address.ToString().c_str());
}
subchannels_.emplace_back();
subchannels_.back().Init(this, std::move(address), std::move(subchannel));
}
subchannels_.back().Init(this, address, std::move(subchannel));
});
}
template <typename SubchannelListType, typename SubchannelDataType>

@ -247,14 +247,13 @@ class OldWeightedRoundRobin : public LoadBalancingPolicy {
WeightedRoundRobinSubchannelData> {
public:
WeightedRoundRobinSubchannelList(OldWeightedRoundRobin* policy,
ServerAddressList addresses,
EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WeightedRoundRobinSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
addresses, policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -675,11 +674,10 @@ void OldWeightedRoundRobin::ResetBackoffLocked() {
absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
config_ = std::move(args.config);
ServerAddressList addresses;
std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[WRR %p] received update", this);
}
// Weed out duplicate addresses. Also sort the addresses so that if
// the set of the addresses don't change, their indexes in the
@ -698,10 +696,12 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
}
};
std::set<ServerAddress, AddressLessThan> ordered_addresses(
args.addresses->begin(), args.addresses->end());
addresses =
ServerAddressList(ordered_addresses.begin(), ordered_addresses.end());
std::set<ServerAddress, AddressLessThan> ordered_addresses;
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
ordered_addresses.insert(endpoint);
});
addresses = std::make_shared<EndpointAddressesListIterator>(
ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
@ -718,8 +718,8 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ =
MakeRefCounted<WeightedRoundRobinSubchannelList>(
this, std::move(addresses), args.args);
MakeRefCounted<WeightedRoundRobinSubchannelList>(this, addresses.get(),
args.args);
latest_pending_subchannel_list_->StartWatchingLocked(args.args);
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
@ -1079,7 +1079,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
WrrEndpointList(RefCountedPtr<WeightedRoundRobin> wrr,
const EndpointAddressesList& endpoints,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(wrr),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
@ -1516,11 +1516,10 @@ void WeightedRoundRobin::ResetBackoffLocked() {
absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
config_ = std::move(args.config);
EndpointAddressesList addresses;
std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[WRR %p] received update", this);
}
// Weed out duplicate endpoints. Also sort the endpoints so that if
// the set of endpoints doesn't change, their indexes in the endpoint
@ -1539,10 +1538,13 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return e1 < e2;
}
};
std::set<EndpointAddresses, EndpointAddressesLessThan> ordered_addresses(
args.addresses->begin(), args.addresses->end());
addresses = EndpointAddressesList(ordered_addresses.begin(),
ordered_addresses.end());
std::set<EndpointAddresses, EndpointAddressesLessThan> ordered_addresses;
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
ordered_addresses.insert(endpoint);
});
addresses =
std::make_shared<EndpointAddressesListIterator>(EndpointAddressesList(
ordered_addresses.begin(), ordered_addresses.end()));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
@ -1559,7 +1561,7 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
this, latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ =
MakeOrphanable<WrrEndpointList>(Ref(), std::move(addresses), args.args);
MakeOrphanable<WrrEndpointList>(Ref(), addresses.get(), args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {

@ -157,10 +157,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void Orphan() override;
absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<EndpointAddressesList> addresses,
const std::string& resolution_note,
const ChannelArgs& args);
absl::Status UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
const std::string& resolution_note, const ChannelArgs& args);
void ResetBackoffLocked();
void DeactivateLocked();
@ -338,11 +338,12 @@ absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
target = MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name);
}
absl::StatusOr<EndpointAddressesList> addresses;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
if (address_map.ok()) {
auto it = address_map->find(name);
if (it == address_map->end()) {
addresses.emplace();
addresses = std::make_shared<EndpointAddressesListIterator>(
EndpointAddressesList());
} else {
addresses = std::move(it->second);
}
@ -589,7 +590,7 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<EndpointAddressesList> addresses,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
const std::string& resolution_note, const ChannelArgs& args) {
if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.

@ -199,8 +199,24 @@ class CdsLb : public LoadBalancingPolicy {
// The root of the tree is config_->cluster().
std::map<std::string, WatcherState> watchers_;
// TODO(roth, yashkt): These are here because we need to handle
// pollset_set linkage as clusters are added or removed from the
// XdsCertificateProvider. However, in the aggregate cluster case,
// there may be multiple clusters in the same cert provider, and we're
// only tracking the cert providers for the most recent underlying
// cluster here. I think this is a bug that could cause us to starve
// the underlying cert providers of polling. However, it is not
// actually causing any problem in practice today, because (a) we have
// no cert provider impl that relies on gRPC's polling and (b)
// probably no one is actually configuring an aggregate cluster with
// different cert providers in different underlying clusters.
// Hopefully, this problem won't be an issue in practice until after
// the EventEngine migration is done, at which point the need for
// handling pollset_set linkage will go away, and these fields can
// simply be removed.
RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
// Child LB policy.

@ -250,7 +250,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
absl::Status UpdateChildPolicyLocked(
absl::StatusOr<EndpointAddressesList> addresses,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
std::string resolution_note, const ChannelArgs& args);
void MaybeUpdatePickerLocked();
@ -569,7 +569,7 @@ OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
}
absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
absl::StatusOr<EndpointAddressesList> addresses,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
std::string resolution_note, const ChannelArgs& args) {
// Create policy if needed.
if (child_policy_ == nullptr) {

@ -149,7 +149,8 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
absl::Status UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const absl::StatusOr<EndpointAddressesList>& addresses,
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>&
addresses,
const ChannelArgs& args);
void ExitIdleLocked();
void ResetBackoffLocked();
@ -482,7 +483,7 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const absl::StatusOr<EndpointAddressesList>& addresses,
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>& addresses,
const ChannelArgs& args) {
if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.

@ -28,6 +28,7 @@
#include <utility>
#include <vector>
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -61,6 +62,7 @@
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -390,7 +392,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
absl::Status UpdateChildPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
EndpointAddressesList CreateChildPolicyAddressesLocked();
std::shared_ptr<EndpointAddressesIterator> CreateChildPolicyAddressesLocked();
std::string CreateChildPolicyResolutionNoteLocked();
RefCountedPtr<Config> CreateChildPolicyConfigLocked();
ChannelArgs CreateChildPolicyArgsLocked(const ChannelArgs& args_in);
@ -529,10 +531,16 @@ XdsClusterResolverLb::DiscoveryMechanismEntry::config() const {
->config_->discovery_mechanisms()[discovery_mechanism->index()];
}
std::string MakeChildPolicyName(absl::string_view cluster_name,
size_t child_number) {
return absl::StrCat("{cluster=", cluster_name,
", child_number=", child_number, "}");
}
std::string XdsClusterResolverLb::DiscoveryMechanismEntry::GetChildPolicyName(
size_t priority) const {
return absl::StrCat("{cluster=", config().cluster_name,
", child_number=", priority_child_numbers[priority], "}");
return MakeChildPolicyName(config().cluster_name,
priority_child_numbers[priority]);
}
//
@ -768,15 +776,37 @@ void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index,
// child policy-related methods
//
EndpointAddressesList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
EndpointAddressesList addresses;
for (const auto& discovery_entry : discovery_mechanisms_) {
const auto& priority_list =
GetUpdatePriorityList(*discovery_entry.latest_update);
class PriorityEndpointIterator : public EndpointAddressesIterator {
public:
struct DiscoveryMechanismResult {
std::shared_ptr<const XdsEndpointResource> update;
std::string cluster_name;
std::vector<size_t /*child_number*/> priority_child_numbers;
DiscoveryMechanismResult(
std::shared_ptr<const XdsEndpointResource> resource,
std::string cluster, std::vector<size_t> child_numbers)
: update(std::move(resource)),
cluster_name(std::move(cluster)),
priority_child_numbers(std::move(child_numbers)) {}
std::string GetChildPolicyName(size_t priority) const {
return MakeChildPolicyName(cluster_name,
priority_child_numbers[priority]);
}
};
explicit PriorityEndpointIterator(
std::vector<DiscoveryMechanismResult> results)
: results_(std::move(results)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (const auto& entry : results_) {
const auto& priority_list = GetUpdatePriorityList(*entry.update);
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& priority_entry = priority_list[priority];
std::string priority_child_name =
discovery_entry.GetChildPolicyName(priority);
std::string priority_child_name = entry.GetChildPolicyName(priority);
for (const auto& p : priority_entry.localities) {
const auto& locality_name = p.first;
const auto& locality = p.second;
@ -789,18 +819,33 @@ EndpointAddressesList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
uint32_t endpoint_weight =
locality.lb_weight *
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
addresses.emplace_back(
callback(EndpointAddresses(
endpoint.addresses(),
endpoint.args()
.SetObject(hierarchical_path_attr)
.Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
.SetObject(locality_name->Ref())
.Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight));
.Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)));
}
}
}
}
}
private:
std::vector<DiscoveryMechanismResult> results_;
};
std::shared_ptr<EndpointAddressesIterator>
XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
std::vector<PriorityEndpointIterator::DiscoveryMechanismResult> entries;
entries.reserve(discovery_mechanisms_.size());
for (const auto& discovery_entry : discovery_mechanisms_) {
entries.emplace_back(discovery_entry.latest_update,
discovery_entry.config().cluster_name,
discovery_entry.priority_child_numbers);
}
return addresses;
return std::make_shared<PriorityEndpointIterator>(std::move(entries));
}
std::string XdsClusterResolverLb::CreateChildPolicyResolutionNoteLocked() {

@ -18,7 +18,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h"
#include <inttypes.h>
#include <stddef.h>
#include <algorithm>
@ -34,6 +33,7 @@
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -300,8 +300,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked();
absl::StatusOr<EndpointAddressesList> UpdateAddressMap(
absl::StatusOr<EndpointAddressesList> endpoints);
void UpdateAddressMap(const EndpointAddressesIterator& endpoints);
RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
const grpc_resolved_address& address,
@ -508,12 +507,36 @@ void XdsOverrideHostLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
// Wraps the endpoint iterator and filters out endpoints in state DRAINING.
class ChildEndpointIterator : public EndpointAddressesIterator {
public:
explicit ChildEndpointIterator(
std::shared_ptr<EndpointAddressesIterator> parent_it)
: parent_it_(std::move(parent_it)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] Received update with %" PRIuPTR
" addresses",
this, args.addresses.ok() ? args.addresses->size() : 0);
"[xds_override_host_lb %p] endpoint %s: not draining, "
"passing to child",
this, endpoint.ToString().c_str());
}
callback(endpoint);
}
});
}
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
};
absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this);
}
auto old_config = std::move(config_);
// Update config.
@ -521,13 +544,24 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (config_ == nullptr) {
return absl::InvalidArgumentError("Missing policy config");
}
// Update address map and wrap endpoint iterator for child policy.
if (args.addresses.ok()) {
UpdateAddressMap(**args.addresses);
args.addresses =
std::make_shared<ChildEndpointIterator>(std::move(*args.addresses));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
args.addresses.status().ToString().c_str());
}
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
// Update child policy.
UpdateArgs update_args;
update_args.addresses = UpdateAddressMap(std::move(args.addresses));
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_config();
update_args.args = std::move(args.args);
@ -578,18 +612,9 @@ OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
return lb_policy;
}
absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
absl::StatusOr<EndpointAddressesList> endpoints) {
if (!endpoints.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
endpoints.status().ToString().c_str());
}
return endpoints;
}
// Construct the list of addresses to pass to the child policy and a
// map of address info from which to update subchannel_map_.
EndpointAddressesList child_addresses;
void XdsOverrideHostLb::UpdateAddressMap(
const EndpointAddressesIterator& endpoints) {
// Construct a map of address info from which to update subchannel_map_.
struct AddressInfo {
XdsHealthStatus eds_health_status;
RefCountedStringValue address_list;
@ -597,25 +622,18 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
: eds_health_status(status), address_list(std::move(addresses)) {}
};
std::map<const std::string, AddressInfo> addresses_for_map;
for (const auto& endpoint : *endpoints) {
endpoints.ForEach([&](const EndpointAddresses& endpoint) {
XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: not draining, "
"passing to child",
this, endpoint.ToString().c_str());
}
child_addresses.push_back(endpoint);
} else if (!config_->override_host_status_set().Contains(status)) {
// Skip draining hosts if not in the override status set.
if (status.status() == XdsHealthStatus::kDraining &&
!config_->override_host_status_set().Contains(status)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: draining but not in "
"override_host_status set -- ignoring",
this, endpoint.ToString().c_str());
}
continue;
return;
}
std::vector<std::string> addresses;
addresses.reserve(endpoint.addresses().size());
@ -641,7 +659,7 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
std::piecewise_construct, std::forward_as_tuple(addresses[i]),
std::forward_as_tuple(status, std::move(address_list)));
}
}
});
// Now grab the lock and update subchannel_map_ from addresses_for_map.
{
MutexLock lock(&subchannel_map_mu_);
@ -688,7 +706,6 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
it->second.set_address_list(std::move(address_info.address_list));
}
}
return child_addresses;
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>

@ -21,7 +21,6 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -170,10 +169,10 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
// Scan the addresses to find the weight for each locality.
std::map<std::string, uint32_t> locality_weights;
if (args.addresses.ok()) {
for (const auto& address : *args.addresses) {
auto* locality_name = address.args().GetObject<XdsLocalityName>();
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
auto* locality_name = endpoint.args().GetObject<XdsLocalityName>();
uint32_t weight =
address.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
if (locality_name != nullptr && weight > 0) {
auto p = locality_weights.emplace(
locality_name->AsHumanReadableString(), weight);
@ -184,7 +183,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
p.first->first.c_str(), p.first->second, weight);
}
}
}
});
}
// Construct the config for the weighted_target policy.
Json::Object weighted_targets;

@ -30,7 +30,6 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/thd.h"
static thread_local bool g_timer_thread;
@ -67,41 +66,32 @@ bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
}
void TimerManager::MainLoop() {
for (;;) {
grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
absl::optional<std::vector<experimental::EventEngine::Closure*>>
check_result = timer_list_->TimerCheck(&next);
GPR_ASSERT(check_result.has_value() &&
"ERROR: More than one MainLoop is running.");
if (!check_result->empty()) {
bool timers_found = !check_result->empty();
if (timers_found) {
RunSomeTimers(std::move(*check_result));
continue;
}
if (!WaitUntil(next)) break;
}
thread_pool_->Run([this, next, timers_found]() {
if (!timers_found && !WaitUntil(next)) {
main_loop_exit_signal_->Notify();
return;
}
MainLoop();
});
}
bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
void TimerManager::StartMainLoopThread() {
main_thread_ = grpc_core::Thread(
"timer_manager",
[](void* arg) {
auto self = static_cast<TimerManager*>(arg);
self->MainLoop();
},
this, nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false));
main_thread_.Start();
}
TimerManager::TimerManager(
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
: host_(this), thread_pool_(std::move(thread_pool)) {
timer_list_ = std::make_unique<TimerList>(&host_);
main_loop_exit_signal_.emplace();
StartMainLoopThread();
thread_pool_->Run([this]() { MainLoop(); });
}
grpc_core::Timestamp TimerManager::Host::Now() {
@ -162,7 +152,7 @@ void TimerManager::RestartPostFork() {
}
shutdown_ = false;
main_loop_exit_signal_.emplace();
StartMainLoopThread();
thread_pool_->Run([this]() { MainLoop(); });
}
void TimerManager::PrepareFork() { Shutdown(); }

@ -36,7 +36,6 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
@ -80,7 +79,6 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
TimerManager* const timer_manager_;
};
void StartMainLoopThread();
void RestartPostFork();
void MainLoop();
void RunSomeTimers(std::vector<experimental::EventEngine::Closure*> timers);
@ -103,7 +101,6 @@ class TimerManager final : public grpc_event_engine::experimental::Forkable {
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = false;
// actual timer implementation
std::unique_ptr<TimerList> timer_list_;
grpc_core::Thread main_thread_;
std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool_;
absl::optional<grpc_core::Notification> main_loop_exit_signal_;
};

@ -104,6 +104,19 @@ inline bool operator==(const RefCountedStringValue& lhs,
return lhs.as_string_view() == rhs.as_string_view();
}
inline bool operator!=(const RefCountedStringValue& lhs,
absl::string_view rhs) {
return lhs.as_string_view() != rhs;
}
inline bool operator!=(absl::string_view lhs,
const RefCountedStringValue& rhs) {
return lhs != rhs.as_string_view();
}
inline bool operator!=(const RefCountedStringValue& lhs,
const RefCountedStringValue& rhs) {
return lhs.as_string_view() != rhs.as_string_view();
}
inline bool operator<(const RefCountedStringValue& lhs, absl::string_view rhs) {
return lhs.as_string_view() < rhs;
}

@ -346,7 +346,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
struct UpdateArgs {
/// A list of endpoints, each with one or more address, or an error
/// indicating a failure to obtain the list of addresses.
absl::StatusOr<EndpointAddressesList> addresses;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
/// The LB policy config.
RefCountedPtr<Config> config;
/// A human-readable note providing context about the name resolution that

@ -25,7 +25,6 @@
#include "absl/strings/str_join.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/crash.h"
namespace grpc_core {
@ -84,7 +83,23 @@ class FreestandingActivity::Handle final : public Wakeable {
}
void WakeupAsync(WakeupMask) override ABSL_LOCKS_EXCLUDED(mu_) {
Crash("not implemented");
mu_.Lock();
// Note that activity refcount can drop to zero, but we could win the lock
// against DropActivity, so we need to only increase activities refcount if
// it is non-zero.
if (activity_ && activity_->RefIfNonzero()) {
FreestandingActivity* activity = activity_;
mu_.Unlock();
// Activity still exists and we have a reference: wake it up, which will
// drop the ref.
activity->WakeupAsync(0);
} else {
// Could not get the activity - it's either gone or going. No need to wake
// it up!
mu_.Unlock();
}
// Drop the ref to the handle (we have one ref = one wakeup semantics).
Unref();
}
void Drop(WakeupMask) override { Unref(); }

@ -32,7 +32,6 @@
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/sync.h"
@ -502,7 +501,7 @@ class PromiseActivity final
// the activity to an external threadpool to run. If the activity is already
// running on this thread, a note is taken of such and the activity is
// repolled if it doesn't complete.
void Wakeup(WakeupMask) final {
void Wakeup(WakeupMask m) final {
// If there is an active activity, but hey it's us, flag that and we'll loop
// in RunLoop (that's calling from above here!).
if (Activity::is_current()) {
@ -511,6 +510,10 @@ class PromiseActivity final
WakeupComplete();
return;
}
WakeupAsync(m);
}
void WakeupAsync(WakeupMask) final {
if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
// Can't safely run, so ask to run later.
this->ScheduleWakeup();
@ -520,8 +523,6 @@ class PromiseActivity final
}
}
void WakeupAsync(WakeupMask) final { Crash("not implemented"); }
// Drop a wakeup
void Drop(WakeupMask) final { this->WakeupComplete(); }

@ -0,0 +1,98 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
#define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <string>
#include "absl/base/thread_annotations.h"
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/trace.h"
#include "src/core/lib/promise/wait_set.h"
namespace grpc_core {
// A latch providing true cross activity wakeups
template <typename T>
class InterActivityLatch;
template <>
class InterActivityLatch<void> {
public:
InterActivityLatch() = default;
InterActivityLatch(const InterActivityLatch&) = delete;
InterActivityLatch& operator=(const InterActivityLatch&) = delete;
// Produce a promise to wait for this latch.
auto Wait() {
return [this]() -> Poll<Empty> {
MutexLock lock(&mu_);
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
StateString().c_str());
}
if (is_set_) {
return Empty{};
} else {
return waiters_.AddPending(Activity::current()->MakeNonOwningWaker());
}
};
}
// Set the latch.
void Set() {
MutexLock lock(&mu_);
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
}
is_set_ = true;
waiters_.WakeupAsync();
}
bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) {
MutexLock lock(&mu_);
return is_set_;
}
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(),
" INTER_ACTIVITY_LATCH[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}
std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return absl::StrCat("is_set:", is_set_);
}
mutable Mutex mu_;
// True if we have a value set, false otherwise.
bool is_set_ = false;
WaitSet waiters_ ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H

@ -37,6 +37,7 @@ namespace grpc_core {
// Initially the Latch is unset.
// It can be waited upon by the Wait method, which produces a Promise that
// resolves when the Latch is Set to a value of type T.
// Latches only work correctly within a single activity.
template <typename T>
class Latch {
public:
@ -204,6 +205,9 @@ class Latch<void> {
IntraActivityWaiter waiter_;
};
template <typename T>
using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait());
// A Latch that can have its value observed by outside threads, but only waited
// upon from inside a single activity.
template <typename T>
@ -268,9 +272,6 @@ class ExternallyObservableLatch<void> {
IntraActivityWaiter waiter_;
};
template <typename T>
using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait());
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_LATCH_H

@ -247,7 +247,7 @@ bool Party::RunParty() {
}
// Poll the participant.
currently_polling_ = i;
bool done = participant->Poll();
bool done = participant->PollParticipantPromise();
currently_polling_ = kNotPolling;
if (done) {
if (!name.empty()) {

@ -24,6 +24,7 @@
#include <string>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
@ -38,6 +39,7 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/trace.h"
#include "src/core/lib/resource_quota/arena.h"
@ -298,7 +300,7 @@ class Party : public Activity, private Wakeable {
explicit Participant(absl::string_view name) : name_(name) {}
// Poll the participant. Return true if complete.
// Participant should take care of its own deallocation in this case.
virtual bool Poll() = 0;
virtual bool PollParticipantPromise() = 0;
// Destroy the participant before finishing.
virtual void Destroy() = 0;
@ -330,6 +332,9 @@ class Party : public Activity, private Wakeable {
void Spawn(absl::string_view name, Factory promise_factory,
OnComplete on_complete);
template <typename Factory>
auto SpawnWaitable(absl::string_view name, Factory factory);
void Orphan() final { Crash("unused"); }
// Activity implementation: not allowed to be overridden by derived types.
@ -414,7 +419,7 @@ class Party : public Activity, private Wakeable {
}
}
bool Poll() override {
bool PollParticipantPromise() override {
if (!started_) {
auto p = factory_.Make();
Destruct(&factory_);
@ -441,6 +446,89 @@ class Party : public Activity, private Wakeable {
bool started_ = false;
};
template <typename SuppliedFactory>
class PromiseParticipantImpl final
: public RefCounted<PromiseParticipantImpl<SuppliedFactory>,
NonPolymorphicRefCount>,
public Participant {
using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>;
using Promise = typename Factory::Promise;
using Result = typename Promise::Result;
public:
PromiseParticipantImpl(absl::string_view name,
SuppliedFactory promise_factory)
: Participant(name) {
Construct(&factory_, std::move(promise_factory));
}
~PromiseParticipantImpl() {
switch (state_.load(std::memory_order_acquire)) {
case State::kFactory:
Destruct(&factory_);
break;
case State::kPromise:
Destruct(&promise_);
break;
case State::kResult:
Destruct(&result_);
break;
}
}
// Inside party poll: drive from factory -> promise -> result
bool PollParticipantPromise() override {
switch (state_.load(std::memory_order_relaxed)) {
case State::kFactory: {
auto p = factory_.Make();
Destruct(&factory_);
Construct(&promise_, std::move(p));
state_.store(State::kPromise, std::memory_order_relaxed);
}
ABSL_FALLTHROUGH_INTENDED;
case State::kPromise: {
auto p = promise_();
if (auto* r = p.value_if_ready()) {
Destruct(&promise_);
Construct(&result_, std::move(*r));
state_.store(State::kResult, std::memory_order_release);
waiter_.Wakeup();
this->Unref();
return true;
}
return false;
}
case State::kResult:
Crash(
"unreachable: promises should not be repolled after completion");
}
}
// Outside party poll: check whether the spawning party has completed this
// promise.
Poll<Result> PollCompletion() {
switch (state_.load(std::memory_order_acquire)) {
case State::kFactory:
case State::kPromise:
return Pending{};
case State::kResult:
return std::move(result_);
}
}
void Destroy() override { this->Unref(); }
private:
enum class State : uint8_t { kFactory, kPromise, kResult };
union {
GPR_NO_UNIQUE_ADDRESS Factory factory_;
GPR_NO_UNIQUE_ADDRESS Promise promise_;
GPR_NO_UNIQUE_ADDRESS Result result_;
};
Waker waiter_{Activity::current()->MakeOwningWaker()};
std::atomic<State> state_{State::kFactory};
};
// Notification that the party has finished and this instance can be deleted.
// Derived types should arrange to call CancelRemainingParticipants during
// this sequence.
@ -502,6 +590,17 @@ void Party::Spawn(absl::string_view name, Factory promise_factory,
std::move(on_complete));
}
template <typename Factory>
auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) {
auto participant = MakeRefCounted<PromiseParticipantImpl<Factory>>(
name, std::move(promise_factory));
Participant* p = participant->Ref().release();
AddParticipants(&p, 1);
return [participant = std::move(participant)]() mutable {
return participant->PollCompletion();
};
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_PARTY_H

@ -69,6 +69,12 @@ class WaitSet final {
return ret;
}
void WakeupAsync() {
while (!pending_.empty()) {
pending_.extract(pending_.begin()).value().WakeupAsync();
}
}
private:
// Handles to activities that need to be awoken.
WakerSet pending_;

@ -22,7 +22,6 @@
#include <string.h>
#include <algorithm>
#include <string>
#include <utility>
#include <vector>

@ -23,8 +23,11 @@
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/functional/function_ref.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolved_address.h"
@ -64,6 +67,9 @@ class EndpointAddresses {
bool operator==(const EndpointAddresses& other) const {
return Cmp(other) == 0;
}
bool operator!=(const EndpointAddresses& other) const {
return Cmp(other) != 0;
}
bool operator<(const EndpointAddresses& other) const {
return Cmp(other) < 0;
}
@ -111,6 +117,48 @@ class EndpointAddressSet {
std::set<grpc_resolved_address, ResolvedAddressLessThan> addresses_;
};
// An iterator interface for endpoints.
class EndpointAddressesIterator {
public:
virtual ~EndpointAddressesIterator() = default;
// Invokes callback once for each endpoint.
virtual void ForEach(
absl::FunctionRef<void(const EndpointAddresses&)> callback) const = 0;
};
// Iterator over a fixed list of endpoints.
class EndpointAddressesListIterator : public EndpointAddressesIterator {
public:
explicit EndpointAddressesListIterator(EndpointAddressesList endpoints)
: endpoints_(std::move(endpoints)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (const auto& endpoint : endpoints_) {
callback(endpoint);
}
}
private:
EndpointAddressesList endpoints_;
};
// Iterator that returns only a single endpoint.
class SingleEndpointIterator : public EndpointAddressesIterator {
public:
explicit SingleEndpointIterator(EndpointAddresses endpoint)
: endpoint_(std::move(endpoint)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
callback(endpoint_);
}
private:
EndpointAddresses endpoint_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_RESOLVER_ENDPOINT_ADDRESSES_H

@ -23,6 +23,6 @@
#include <grpc/grpc.h>
const char* grpc_version_string(void) { return "36.0.0"; }
const char* grpc_version_string(void) { return "37.0.0"; }
const char* grpc_g_stands_for(void) { return "gjallarhorn"; }
const char* grpc_g_stands_for(void) { return "grand"; }

@ -1,7 +1,7 @@
<!-- This file is generated -->
<Project>
<PropertyGroup>
<GrpcCsharpVersion>2.60.0-dev</GrpcCsharpVersion>
<GrpcCsharpVersion>2.61.0-dev</GrpcCsharpVersion>
<GoogleProtobufVersion>3.25.0</GoogleProtobufVersion>
</PropertyGroup>
</Project>

@ -42,7 +42,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCCppPlugin'
v = '1.60.0-dev'
v = '1.61.0-dev'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates C++ files from .proto services.'
s.description = <<-DESC

@ -42,7 +42,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler-gRPCPlugin'
v = '1.60.0-dev'
v = '1.61.0-dev'
s.version = v
s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.'
s.description = <<-DESC

@ -22,4 +22,4 @@
// instead. This file can be regenerated from the template by running
// `tools/buildgen/generate_projects.sh`.
#define GRPC_OBJC_VERSION_STRING @"1.60.0-dev"
#define GRPC_OBJC_VERSION_STRING @"1.61.0-dev"

@ -18,7 +18,7 @@
#import "GRXConcurrentWriteable.h"
#import <RxLibrary/GRXWriteable.h>
#import "GRXWriteable.h"
@interface GRXConcurrentWriteable ()
// This is atomic so that cancellation can nillify it from any thread.

@ -16,7 +16,7 @@
*
*/
#import "RxLibrary/GRXForwardingWriter.h"
#import "GRXForwardingWriter.h"
/** A "proxy" writer that transforms all the values of its input writer by using a mapping function.
*/

@ -14,7 +14,7 @@
* limitations under the License.
*/
#import "GRPCBlockCallbackResponseHandler.h"
#import <GRPCClient/GRPCBlockCallbackResponseHandler.h>
@implementation GRPCBlockCallbackResponseHandler {
void (^_initialMetadataCallback)(NSDictionary *);

@ -22,5 +22,5 @@
// instead. This file can be regenerated from the template by running
// `tools/buildgen/generate_projects.sh`.
#define GRPC_OBJC_VERSION_STRING @"1.60.0-dev"
#define GRPC_C_VERSION_STRING @"36.0.0"
#define GRPC_OBJC_VERSION_STRING @"1.61.0-dev"
#define GRPC_C_VERSION_STRING @"37.0.0"

@ -2,7 +2,7 @@
"name": "grpc/grpc-dev",
"description": "gRPC library for PHP - for Development use only",
"license": "Apache-2.0",
"version": "1.60.0",
"version": "1.61.0",
"require": {
"php": ">=7.0.0",
"google/protobuf": "^v3.3.0"

@ -20,6 +20,6 @@
#ifndef VERSION_H
#define VERSION_H
#define PHP_GRPC_VERSION "1.60.0dev"
#define PHP_GRPC_VERSION "1.61.0dev"
#endif /* VERSION_H */

@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
licenses(["notice"])

@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
licenses(["notice"])

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!!
__version__ = """1.60.0.dev0"""
__version__ = """1.61.0.dev0"""

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_admin/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_channelz/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_csds/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_status/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'

@ -14,5 +14,5 @@
# GRPC contains the General RPC module.
module GRPC
VERSION = '1.60.0.dev'
VERSION = '1.61.0.dev'
end

@ -14,6 +14,6 @@
module GRPC
module NativeDebug
VERSION = '1.60.0.dev'
VERSION = '1.61.0.dev'
end
end

@ -14,6 +14,6 @@
module GRPC
module Tools
VERSION = '1.60.0.dev'
VERSION = '1.61.0.dev'
end
end

@ -773,7 +773,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
absl::Span<const EndpointAddresses> endpoints,
RefCountedPtr<LoadBalancingPolicy::Config> config) {
LoadBalancingPolicy::UpdateArgs update;
update.addresses.emplace(endpoints.begin(), endpoints.end());
update.addresses = std::make_shared<EndpointAddressesListIterator>(
EndpointAddressesList(endpoints.begin(), endpoints.end()));
update.config = std::move(config);
return update;
}

@ -99,11 +99,13 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
"HEALTHY"}) {
LoadBalancingPolicy::UpdateArgs update;
update.config = MakeXdsOverrideHostConfig(override_host_status);
update.addresses.emplace();
EndpointAddressesList endpoints;
for (auto address_and_status : addresses_and_statuses) {
update.addresses->push_back(MakeAddressWithHealthStatus(
endpoints.push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
update.addresses =
std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus());
}

@ -419,6 +419,27 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "inter_activity_latch_test",
srcs = ["inter_activity_latch_test.cc"],
external_deps = [
"absl/status",
"gtest",
],
language = "c++",
tags = ["promise_test"],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:grpc",
"//src/core:default_event_engine",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:inter_activity_latch",
"//src/core:notification",
"//src/core:seq",
],
)
grpc_cc_test(
name = "mpsc_test",
srcs = ["mpsc_test.cc"],
@ -591,6 +612,7 @@ grpc_cc_test(
"//src/core:context",
"//src/core:default_event_engine",
"//src/core:event_engine_memory_allocator",
"//src/core:inter_activity_latch",
"//src/core:memory_quota",
"//src/core:notification",
"//src/core:poll",

@ -0,0 +1,103 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/promise/inter_activity_latch.h"
#include "absl/status/status.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/seq.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
namespace grpc_core {
namespace {
TEST(InterActivityLatchTest, Works) {
InterActivityLatch<void> latch;
// Start some waiting activities
Notification n1;
auto a1 = MakeActivity(
[&] {
return Seq(latch.Wait(), [&](Empty) {
n1.Notify();
return absl::OkStatus();
});
},
EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {});
Notification n2;
auto a2 = MakeActivity(
[&] {
return Seq(latch.Wait(), [&](Empty) {
n2.Notify();
return absl::OkStatus();
});
},
EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {});
Notification n3;
auto a3 = MakeActivity(
[&] {
return Seq(latch.Wait(), [&](Empty) {
n3.Notify();
return absl::OkStatus();
});
},
EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {});
ASSERT_FALSE(n1.HasBeenNotified());
ASSERT_FALSE(n2.HasBeenNotified());
ASSERT_FALSE(n3.HasBeenNotified());
// Start a setting activity
auto kicker = MakeActivity(
[&] {
latch.Set();
return absl::OkStatus();
},
EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {});
// Start another waiting activity
Notification n4;
auto a4 = MakeActivity(
[&] {
return Seq(latch.Wait(), [&](Empty) {
n4.Notify();
return absl::OkStatus();
});
},
EventEngineWakeupScheduler{GetDefaultEventEngine()}, [](absl::Status) {});
// Everything should finish
n1.WaitForNotification();
n2.WaitForNotification();
n3.WaitForNotification();
n4.WaitForNotification();
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_init(); // for GetDefaultEventEngine
int r = RUN_ALL_TESTS();
grpc_shutdown();
return r;
}

@ -36,6 +36,7 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/inter_activity_latch.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/sleep.h"
@ -298,6 +299,31 @@ TEST_F(PartyTest, CanSpawnAndRun) {
n.WaitForNotification();
}
TEST_F(PartyTest, CanSpawnWaitableAndRun) {
auto party1 = MakeRefCounted<TestParty>();
auto party2 = MakeRefCounted<TestParty>();
Notification n;
InterActivityLatch<void> done;
// Spawn a task on party1 that will wait for a task on party2.
// The party2 task will wait on the latch `done`.
party1->Spawn(
"party1_main",
[&party2, &done]() {
return party2->SpawnWaitable("party2_main",
[&done]() { return done.Wait(); });
},
[&n](Empty) { n.Notify(); });
ASSERT_FALSE(n.HasBeenNotified());
party1->Spawn(
"party1_notify_latch",
[&done]() {
done.Set();
return Empty{};
},
[](Empty) {});
n.WaitForNotification();
}
TEST_F(PartyTest, CanSpawnFromSpawn) {
auto party = MakeRefCounted<TestParty>();
Notification n1;

@ -414,17 +414,19 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
config->address().c_str());
auto uri = URI::Parse(config->address());
args.config.reset();
args.addresses = EndpointAddressesList();
EndpointAddressesList addresses;
if (uri.ok()) {
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
args.addresses->emplace_back(address, ChannelArgs());
addresses.emplace_back(address, ChannelArgs());
} else {
gpr_log(GPR_ERROR,
"%s: could not parse URI (%s), using empty address list",
kFixedAddressLbPolicyName, uri.status().ToString().c_str());
args.resolution_note = "no address in fixed_address_lb policy";
}
args.addresses =
std::make_shared<EndpointAddressesListIterator>(std::move(addresses));
return ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
}

@ -13,13 +13,10 @@
// limitations under the License.
//
// TODO(roth): Split this file up into a common test framework and a set
// of test files that use that framework. Need to figure out the best
// way to split up the tests. One option would be to split it up by xDS
// resource type; another approach would be to have all of the "core"
// xDS functionality in one file and then move specific features to
// their own files (e.g., mTLS security, fault injection, circuit
// breaking, etc).
// TODO(yashkt): Split this file up into (at least) the following pieces:
// - xDS-enabled server functionality
// - mTLS functionality on both client and server
// - RBAC
#include <deque>
#include <memory>
@ -324,34 +321,14 @@ class XdsSecurityTest : public XdsEnd2endTest {
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
}
// Sends CDS updates with the new security configuration and verifies that
// after propagation, this new configuration is used for connections. If \a
// identity_instance_name and \a root_instance_name are both empty,
// connections are expected to use fallback credentials.
void UpdateAndVerifyXdsSecurityConfiguration(
void MaybeSetUpstreamTlsContextOnCluster(
absl::string_view root_instance_name,
absl::string_view root_certificate_name,
absl::string_view identity_instance_name,
absl::string_view identity_certificate_name,
const std::vector<StringMatcher>& san_matchers,
const std::vector<std::string>& expected_authenticated_identity,
bool test_expects_failure = false) {
// Change the backend and use a unique service name to use so that we know
// that the CDS update was applied.
std::string service_name = absl::StrCat(
"eds_service_name",
absl::FormatTime("%H%M%E3S", absl::Now(), absl::LocalTimeZone()));
backend_index_ = (backend_index_ + 1) % 2;
EdsResourceArgs args({
{"locality0",
CreateEndpointsForBackends(backend_index_, backend_index_ + 1)},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, service_name.c_str()));
auto cluster = default_cluster_;
cluster.mutable_eds_cluster_config()->set_service_name(service_name);
const std::vector<StringMatcher>& san_matchers, Cluster* cluster) {
if (!identity_instance_name.empty() || !root_instance_name.empty()) {
auto* transport_socket = cluster.mutable_transport_socket();
auto* transport_socket = cluster->mutable_transport_socket();
transport_socket->set_name("envoy.transport_sockets.tls");
UpstreamTlsContext upstream_tls_context;
if (!identity_instance_name.empty()) {
@ -382,6 +359,37 @@ class XdsSecurityTest : public XdsEnd2endTest {
}
transport_socket->mutable_typed_config()->PackFrom(upstream_tls_context);
}
}
// Sends CDS updates with the new security configuration and verifies that
// after propagation, this new configuration is used for connections. If \a
// identity_instance_name and \a root_instance_name are both empty,
// connections are expected to use fallback credentials.
void UpdateAndVerifyXdsSecurityConfiguration(
absl::string_view root_instance_name,
absl::string_view root_certificate_name,
absl::string_view identity_instance_name,
absl::string_view identity_certificate_name,
const std::vector<StringMatcher>& san_matchers,
const std::vector<std::string>& expected_authenticated_identity,
bool test_expects_failure = false) {
// Change the backend and use a unique service name to use so that we know
// that the CDS update was applied.
std::string service_name = absl::StrCat(
"eds_service_name",
absl::FormatTime("%H%M%E3S", absl::Now(), absl::LocalTimeZone()));
backend_index_ = (backend_index_ + 1) % 2;
EdsResourceArgs args({
{"locality0",
CreateEndpointsForBackends(backend_index_, backend_index_ + 1)},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args, service_name.c_str()));
auto cluster = default_cluster_;
cluster.mutable_eds_cluster_config()->set_service_name(service_name);
MaybeSetUpstreamTlsContextOnCluster(
root_instance_name, root_certificate_name, identity_instance_name,
identity_certificate_name, san_matchers, &cluster);
balancer_->ads_service()->SetCdsResource(cluster);
// The updates might take time to have an effect, so use a retry loop.
if (test_expects_failure) {
@ -730,6 +738,63 @@ TEST_P(XdsSecurityTest, TestFileWatcherCertificateProvider) {
authenticated_identity_);
}
TEST_P(XdsSecurityTest, MtlsWithAggregateCluster) {
g_fake1_cert_data_map->Set({{"", {root_cert_, identity_pair_}}});
g_fake2_cert_data_map->Set({{"", {root_cert_, fallback_identity_pair_}}});
// Set up aggregate cluster.
const char* kNewCluster1Name = "new_cluster_1";
const char* kNewEdsService1Name = "new_eds_service_name_1";
const char* kNewCluster2Name = "new_cluster_2";
const char* kNewEdsService2Name = "new_eds_service_name_2";
// Populate new EDS resources.
EdsResourceArgs args1({
{"locality0", CreateEndpointsForBackends(0, 1)},
});
EdsResourceArgs args2({
{"locality0", CreateEndpointsForBackends(1, 2)},
});
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args1, kNewEdsService1Name));
balancer_->ads_service()->SetEdsResource(
BuildEdsResource(args2, kNewEdsService2Name));
// Populate new CDS resources.
Cluster new_cluster1 = default_cluster_;
new_cluster1.set_name(kNewCluster1Name);
new_cluster1.mutable_eds_cluster_config()->set_service_name(
kNewEdsService1Name);
MaybeSetUpstreamTlsContextOnCluster("fake_plugin1", "", "fake_plugin1", "",
{}, &new_cluster1);
balancer_->ads_service()->SetCdsResource(new_cluster1);
Cluster new_cluster2 = default_cluster_;
new_cluster2.set_name(kNewCluster2Name);
new_cluster2.mutable_eds_cluster_config()->set_service_name(
kNewEdsService2Name);
MaybeSetUpstreamTlsContextOnCluster("fake_plugin1", "", "fake_plugin2", "",
{}, &new_cluster2);
balancer_->ads_service()->SetCdsResource(new_cluster2);
// Create Aggregate Cluster
auto cluster = default_cluster_;
auto* custom_cluster = cluster.mutable_cluster_type();
custom_cluster->set_name("envoy.clusters.aggregate");
envoy::extensions::clusters::aggregate::v3::ClusterConfig cluster_config;
cluster_config.add_clusters(kNewCluster1Name);
cluster_config.add_clusters(kNewCluster2Name);
custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
balancer_->ads_service()->SetCdsResource(cluster);
// RPC should go to backend 0.
CheckRpcSendOk(DEBUG_LOCATION);
EXPECT_EQ(backends_[0]->backend_service()->request_count(), 1);
// Make sure the backend saw the right client identity.
EXPECT_EQ(backends_[0]->backend_service()->last_peer_identity(),
authenticated_identity_);
// Now stop backend 0 and wait for backend 1.
ShutdownBackend(0);
WaitForBackend(DEBUG_LOCATION, 1);
// Make sure the backend saw the right client identity.
EXPECT_EQ(backends_[1]->backend_service()->last_peer_identity(),
fallback_authenticated_identity_);
}
class XdsEnabledServerTest : public XdsEnd2endTest {
protected:
void SetUp() override {} // No-op -- individual tests do this themselves.

@ -14,5 +14,5 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/tools/distrib/python/grpcio_tools/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'
PROTOBUF_VERSION = '3.25.0'

@ -14,5 +14,5 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/tools/distrib/python/grpcio_tools/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'
PROTOBUF_VERSION = '3.25.0'

@ -14,5 +14,5 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/tools/distrib/python/grpcio_tools/grpc_version.py.template`!!!
VERSION = '1.60.0.dev0'
VERSION = '1.61.0.dev0'
PROTOBUF_VERSION = '3.25.0'

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.60.0-dev
PROJECT_NUMBER = 1.61.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.60.0-dev
PROJECT_NUMBER = 1.61.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 36.0.0
PROJECT_NUMBER = 37.0.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 36.0.0
PROJECT_NUMBER = 37.0.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Objective-C"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.60.0-dev
PROJECT_NUMBER = 1.61.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Objective-C"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.60.0-dev
PROJECT_NUMBER = 1.61.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC PHP"
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 1.60.0-dev
PROJECT_NUMBER = 1.61.0-dev
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

@ -15,7 +15,7 @@
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/windows/grpc_distribtests_standalone.bat"
build_file: "grpc/tools/internal_ci/windows/grpc_distribtests_cpp_dll.bat"
timeout_mins: 120
action {
define_artifacts {

@ -4987,6 +4987,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "inter_activity_latch_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save