[work-serializer] Dispatch on run experiment (relanding) (#34372)

Reverts grpc/grpc#34371
pull/34378/head
Craig Tiller 1 year ago committed by GitHub
parent b6f01c68aa
commit 86b931c354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      BUILD
  2. 45
      CMakeLists.txt
  3. 24
      bazel/experiments.bzl
  4. 49
      build_autogenerated.yaml
  5. 3
      src/core/BUILD
  6. 40
      src/core/ext/filters/client_channel/client_channel.cc
  7. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  8. 3
      src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h
  9. 30
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  10. 38
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  11. 1
      src/core/ext/filters/client_channel/subchannel.cc
  12. 4
      src/core/ext/filters/client_channel/subchannel.h
  13. 1
      src/core/ext/xds/xds_client.cc
  14. 6
      src/core/lib/channel/channelz_registry.h
  15. 14
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  16. 21
      src/core/lib/experiments/experiments.cc
  17. 13
      src/core/lib/experiments/experiments.h
  18. 12
      src/core/lib/experiments/experiments.yaml
  19. 2
      src/core/lib/experiments/rollouts.yaml
  20. 294
      src/core/lib/gprpp/work_serializer.cc
  21. 34
      src/core/lib/gprpp/work_serializer.h
  22. 1
      test/core/channel/BUILD
  23. 53
      test/core/channel/channelz_test.cc
  24. 19
      test/core/client_channel/lb_policy/BUILD
  25. 375
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  26. 41
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  27. 52
      test/core/client_channel/lb_policy/pick_first_test.cc
  28. 65
      test/core/client_channel/lb_policy/round_robin_test.cc
  29. 58
      test/core/client_channel/lb_policy/weighted_round_robin_test.cc
  30. 23
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  31. 3
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  32. 3
      test/core/client_channel/resolvers/dns_resolver_test.cc
  33. 40
      test/core/client_channel/resolvers/fake_resolver_test.cc
  34. 4
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc
  35. 1
      test/core/end2end/no_server_test.cc
  36. 3
      test/core/event_engine/BUILD
  37. 25
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  38. 8
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  39. 8
      test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc
  40. 1
      test/core/gprpp/BUILD
  41. 99
      test/core/gprpp/work_serializer_test.cc
  42. 7
      test/core/iomgr/stranded_event_test.cc
  43. 9
      test/core/transport/chttp2/too_many_pings_test.cc
  44. 2
      test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc
  45. 1
      test/cpp/end2end/BUILD
  46. 23
      test/cpp/end2end/client_lb_end2end_test.cc
  47. 6
      test/cpp/end2end/grpclb_end2end_test.cc
  48. 3
      test/cpp/end2end/rls_end2end_test.cc
  49. 8
      test/cpp/end2end/service_config_end2end_test.cc
  50. 8
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  51. 4
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc
  52. 3
      test/cpp/naming/cancel_ares_query_test.cc
  53. 3
      test/cpp/naming/resolver_component_test.cc

10
BUILD

@ -2398,14 +2398,20 @@ grpc_cc_library(
hdrs = [
"//src/core:lib/gprpp/work_serializer.h",
],
external_deps = ["absl/base:core_headers"],
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
],
language = "c++",
visibility = ["@grpc:client_channel"],
deps = [
"debug_location",
"event_engine_base_hdrs",
"exec_ctx",
"gpr",
"grpc_trace",
"orphanable",
"//src/core:experiments",
],
)
@ -3067,6 +3073,7 @@ grpc_cc_library(
"//src/core:error",
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_deadline_filter",
"//src/core:grpc_service_config",
@ -3724,6 +3731,7 @@ grpc_cc_library(
"work_serializer",
"//src/core:channel_args",
"//src/core:grpc_service_config",
"//src/core:notification",
"//src/core:ref_counted",
"//src/core:useful",
],

45
CMakeLists.txt generated

@ -8267,7 +8267,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(cf_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -12216,7 +12215,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(fuzzing_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -13508,7 +13506,6 @@ target_include_directories(h2_ssl_cert_test
target_link_libraries(h2_ssl_cert_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -16574,7 +16571,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(oracle_event_engine_posix_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -16760,7 +16756,13 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(outlier_detection_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/outlier_detection_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(outlier_detection_test PUBLIC cxx_std_14)
target_include_directories(outlier_detection_test
@ -16785,6 +16787,7 @@ target_include_directories(outlier_detection_test
target_link_libraries(outlier_detection_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -17142,7 +17145,13 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(pick_first_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/pick_first_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(pick_first_test PUBLIC cxx_std_14)
target_include_directories(pick_first_test
@ -17167,6 +17176,7 @@ target_include_directories(pick_first_test
target_link_libraries(pick_first_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -17581,7 +17591,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(posix_endpoint_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -17655,7 +17664,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(posix_event_engine_connect_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -17700,7 +17708,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(posix_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc++_test_util
)
@ -20684,7 +20691,13 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(round_robin_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/round_robin_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(round_robin_test PUBLIC cxx_std_14)
target_include_directories(round_robin_test
@ -20709,6 +20722,7 @@ target_include_directories(round_robin_test
target_link_libraries(round_robin_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -23277,6 +23291,7 @@ add_executable(test_core_channel_channelz_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.h
test/core/channel/channelz_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/cpp/util/channel_trace_proto_helper.cc
)
target_compile_features(test_core_channel_channelz_test PUBLIC cxx_std_14)
@ -24561,7 +24576,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(thready_posix_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -25596,7 +25610,13 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(weighted_round_robin_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/weighted_round_robin_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(weighted_round_robin_test PUBLIC cxx_std_14)
target_include_directories(weighted_round_robin_test
@ -25621,6 +25641,7 @@ target_include_directories(weighted_round_robin_test
target_link_libraries(weighted_round_robin_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -25933,6 +25954,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(work_serializer_test
test/core/event_engine/event_engine_test_utils.cc
test/core/gprpp/work_serializer_test.cc
)
target_compile_features(work_serializer_test PUBLIC cxx_std_14)
@ -28632,7 +28654,13 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(xds_override_host_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/xds_override_host_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(xds_override_host_test PUBLIC cxx_std_14)
target_include_directories(xds_override_host_test
@ -28657,6 +28685,7 @@ target_include_directories(xds_override_host_test
target_link_libraries(xds_override_host_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)

@ -23,11 +23,12 @@ EXPERIMENTS = {
"off": {
"core_end2end_test": [
"event_engine_listener",
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
"cpp_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -41,8 +42,8 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lame_client_test": [
"promise_based_client_call",
"lb_unit_test": [
"work_serializer_dispatch",
],
"logging_test": [
"promise_based_server_call",
@ -54,6 +55,7 @@ EXPERIMENTS = {
],
"xds_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
},
"on": {
@ -83,11 +85,12 @@ EXPERIMENTS = {
"off": {
"core_end2end_test": [
"event_engine_listener",
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
"cpp_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -101,8 +104,8 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lame_client_test": [
"promise_based_client_call",
"lb_unit_test": [
"work_serializer_dispatch",
],
"logging_test": [
"promise_based_server_call",
@ -114,6 +117,7 @@ EXPERIMENTS = {
],
"xds_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
},
"on": {
@ -147,11 +151,12 @@ EXPERIMENTS = {
"core_end2end_test": [
"event_engine_client",
"event_engine_listener",
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
"cpp_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -168,8 +173,8 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lame_client_test": [
"promise_based_client_call",
"lb_unit_test": [
"work_serializer_dispatch",
],
"logging_test": [
"promise_based_server_call",
@ -184,6 +189,7 @@ EXPERIMENTS = {
],
"xds_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
},
"on": {

@ -6487,7 +6487,6 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -8841,7 +8840,6 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.cc
deps:
- gtest
- grpc_unsecure
- protobuf
- grpc_test_util
platforms:
@ -9493,7 +9491,6 @@ targets:
- test/core/event_engine/event_engine_test_utils.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
- name: h2_ssl_session_reuse_test
gtest: true
@ -11276,7 +11273,6 @@ targets:
- test/core/event_engine/test_suite/tests/server_test.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -11348,11 +11344,16 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/outlier_detection_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: overload_test
@ -11574,12 +11575,17 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/util/scoped_env_var.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/pick_first_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: pid_controller_test
@ -11844,7 +11850,6 @@ targets:
- test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -11879,7 +11884,6 @@ targets:
- test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -11911,7 +11915,6 @@ targets:
- test/cpp/util/get_grpc_test_runfile_dir.cc
deps:
- gtest
- grpc_unsecure
- grpc++_test_util
platforms:
- linux
@ -13913,11 +13916,16 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/round_robin_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: secure_auth_context_test
@ -15259,10 +15267,12 @@ targets:
build: test
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
- test/cpp/util/channel_trace_proto_helper.h
src:
- src/proto/grpc/channelz/channelz.proto
- test/core/channel/channelz_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/cpp/util/channel_trace_proto_helper.cc
deps:
- gtest
@ -16323,7 +16333,6 @@ targets:
- test/core/event_engine/test_suite/thready_posix_event_engine_test.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -16856,11 +16865,16 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/weighted_round_robin_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: win_socket_test
@ -17134,8 +17148,10 @@ targets:
build: test
run: false
language: c++
headers: []
headers:
- test/core/event_engine/event_engine_test_utils.h
src:
- test/core/event_engine/event_engine_test_utils.cc
- test/core/gprpp/work_serializer_test.cc
deps:
- gtest
@ -18171,11 +18187,16 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/mock_event_engine.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/xds_override_host_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: xds_pick_first_end2end_test

@ -2368,6 +2368,9 @@ grpc_cc_library(
"lib/event_engine/default_event_engine.h",
],
external_deps = ["absl/functional:any_invocable"],
visibility = [
"@grpc:alt_grpc_base_legacy",
],
deps = [
"channel_args",
"context",

@ -71,7 +71,9 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
@ -341,11 +343,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
chand_, this, Activity::current()->DebugTag().c_str(),
result.has_value() ? result->ToString().c_str() : "Pending");
}
if (!result.has_value()) {
waker_ = Activity::current()->MakeNonOwningWaker();
was_queued_ = true;
return Pending{};
}
if (!result.has_value()) return Pending{};
if (!result->ok()) return *result;
call_args.client_initial_metadata = std::move(client_initial_metadata_);
return std::move(call_args);
@ -363,10 +361,17 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
return GetContext<grpc_call_context_element>();
}
void RetryCheckResolutionLocked() override {
void OnAddToQueueLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) {
waker_ = Activity::current()->MakeNonOwningWaker();
was_queued_ = true;
}
void RetryCheckResolutionLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked()",
chand_, this);
gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked(): %s",
chand_, this, waker_.ActivityDebugTag().c_str());
}
waker_.WakeupAsync();
}
@ -383,7 +388,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
grpc_polling_entity pollent_;
ClientMetadataHandle client_initial_metadata_;
bool was_queued_ = false;
Waker waker_;
Waker waker_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_);
};
//
@ -1214,7 +1219,8 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args,
interested_parties_(grpc_pollset_set_create()),
service_config_parser_index_(
internal::ClientChannelServiceConfigParser::ParserIndex()),
work_serializer_(std::make_shared<WorkSerializer>()),
work_serializer_(
std::make_shared<WorkSerializer>(*args->channel_stack->event_engine)),
state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
subchannel_pool_(GetSubchannelPool(channel_args_)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
@ -1774,6 +1780,10 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() {
void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
const absl::Status& status,
const char* reason) {
if (state != GRPC_CHANNEL_SHUTDOWN &&
state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
Crash("Illegal transition SHUTDOWN -> anything");
}
state_tracker_.SetState(state, status, reason);
if (channelz_node_ != nullptr) {
channelz_node_->SetConnectivityState(state);
@ -1970,10 +1980,12 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
}
void ClientChannel::TryToConnectLocked() {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
} else if (resolver_ == nullptr) {
CreateResolverLocked();
if (disconnect_error_.ok()) {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
} else if (resolver_ == nullptr) {
CreateResolverLocked();
}
}
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
}

@ -1588,7 +1588,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() {
// Pass channel creds via channel args, since the fake resolver won't
// do this automatically.
result.args = lb_channel_args.SetObject(std::move(channel_credentials));
response_generator_->SetResponse(std::move(result));
response_generator_->SetResponseAsync(std::move(result));
// Return status.
return status;
}

@ -125,7 +125,8 @@ class HealthProducer : public Subchannel::DataProducerInterface {
WeakRefCountedPtr<HealthProducer> producer_;
absl::string_view health_check_service_name_;
std::shared_ptr<WorkSerializer> work_serializer_ =
std::make_shared<WorkSerializer>();
std::make_shared<WorkSerializer>(
producer_->subchannel_->event_engine());
absl::optional<grpc_connectivity_state> state_
ABSL_GUARDED_BY(&HealthProducer::mu_);

@ -214,25 +214,31 @@ FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
void FakeResolverResponseGenerator::SetResponseAndNotify(
Resolver::Result result, Notification* notify_when_set) {
RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
if (resolver_ == nullptr) {
has_result_ = true;
result_ = std::move(result);
if (notify_when_set != nullptr) notify_when_set->Notify();
return;
}
resolver = resolver_->Ref();
}
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver, std::move(result));
resolver->work_serializer_->Run([arg]() { arg->SetResponseLocked(); },
DEBUG_LOCATION);
resolver->work_serializer_->Run(
[arg, notify_when_set]() {
arg->SetResponseLocked();
if (notify_when_set != nullptr) notify_when_set->Notify();
},
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetReresolutionResponse(
Resolver::Result result) {
void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify(
Resolver::Result result, Notification* notify_when_set) {
RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
@ -242,7 +248,11 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
resolver, std::move(result), true /* has_result */);
resolver->work_serializer_->Run(
[arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
[arg, notify_when_set]() {
arg->SetReresolutionResponseLocked();
if (notify_when_set != nullptr) notify_when_set->Notify();
},
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
@ -289,6 +299,7 @@ void FakeResolverResponseGenerator::SetFakeResolver(
RefCountedPtr<FakeResolver> resolver) {
MutexLock lock(&mu_);
resolver_ = std::move(resolver);
cv_.SignalAll();
if (resolver_ == nullptr) return;
if (has_result_) {
FakeResolverResponseSetter* arg =
@ -299,6 +310,13 @@ void FakeResolverResponseGenerator::SetFakeResolver(
}
}
void FakeResolverResponseGenerator::WaitForResolverSet() {
MutexLock lock(&mu_);
while (resolver_ == nullptr) {
cv_.Wait(&mu_);
}
}
namespace {
void* ResponseGeneratorChannelArgCopy(void* p) {

@ -19,12 +19,15 @@
#include <grpc/support/port_platform.h>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -57,13 +60,39 @@ class FakeResolverResponseGenerator
// instance to trigger a new resolution with the specified result. If the
// resolver is not available yet, delays response setting until it is. This
// can be called at most once before the resolver is available.
void SetResponse(Resolver::Result result);
// notify_when_set is an optional notification to signal when the response has
// been set.
void SetResponseAndNotify(Resolver::Result result,
Notification* notify_when_set);
// Same as SetResponseAndNotify(), assume that async setting is fine
void SetResponseAsync(Resolver::Result result) {
SetResponseAndNotify(std::move(result), nullptr);
}
// Same as SetResponseAndNotify(), but create and wait for the notification
void SetResponseSynchronously(Resolver::Result result) {
Notification n;
SetResponseAndNotify(std::move(result), &n);
n.WaitForNotification();
}
// Sets the re-resolution response, which is returned by the fake resolver
// when re-resolution is requested (via \a RequestReresolutionLocked()).
// The new re-resolution response replaces any previous re-resolution
// response that may have been set by a previous call.
void SetReresolutionResponse(Resolver::Result result);
// notify_when_set is an optional notification to signal when the response has
// been set.
void SetReresolutionResponseAndNotify(Resolver::Result result,
Notification* notify_when_set);
void SetReresolutionResponseAsync(Resolver::Result result) {
SetReresolutionResponseAndNotify(std::move(result), nullptr);
}
void SetReresolutionResponseSynchronously(Resolver::Result result) {
Notification n;
SetReresolutionResponseAndNotify(std::move(result), &n);
n.WaitForNotification();
}
// Unsets the re-resolution response. After this, the fake resolver will
// not return anything when \a RequestReresolutionLocked() is called.
@ -88,6 +117,10 @@ class FakeResolverResponseGenerator
return GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR;
}
// Wait for a resolver to be set (setting may be happening asynchronously, so
// this may block - consider it test only).
void WaitForResolverSet();
static int ChannelArgsCompare(const FakeResolverResponseGenerator* a,
const FakeResolverResponseGenerator* b) {
return QsortCompare(a, b);
@ -100,6 +133,7 @@ class FakeResolverResponseGenerator
// Mutex protecting the members below.
Mutex mu_;
CondVar cv_;
RefCountedPtr<FakeResolver> resolver_ ABSL_GUARDED_BY(mu_);
Resolver::Result result_ ABSL_GUARDED_BY(mu_);
bool has_result_ ABSL_GUARDED_BY(mu_) = false;

@ -463,6 +463,7 @@ Subchannel::Subchannel(SubchannelKey key,
pollset_set_(grpc_pollset_set_create()),
connector_(std::move(connector)),
watcher_list_(this),
work_serializer_(args_.GetObjectRef<EventEngine>()),
backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
event_engine_(args_.GetObjectRef<EventEngine>()) {
// A grpc_init is added here to ensure that grpc_shutdown does not happen

@ -272,6 +272,10 @@ class Subchannel : public DualRefCounted<Subchannel> {
void RemoveDataProducer(DataProducerInterface* data_producer)
ABSL_LOCKS_EXCLUDED(mu_);
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine() {
return event_engine_;
}
private:
// A linked list of ConnectivityStateWatcherInterfaces that are monitoring
// the subchannel's state.

@ -1491,6 +1491,7 @@ XdsClient::XdsClient(
xds_federation_enabled_(XdsFederationEnabled()),
api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_,
std::move(user_agent_name), std::move(user_agent_version)),
work_serializer_(engine),
engine_(std::move(engine)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);

@ -25,6 +25,8 @@
#include <map>
#include <string>
#include "absl/base/thread_annotations.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -90,8 +92,8 @@ class ChannelzRegistry {
// protects members
Mutex mu_;
std::map<intptr_t, BaseNode*> node_map_;
intptr_t uuid_generator_ = 0;
std::map<intptr_t, BaseNode*> node_map_ ABSL_GUARDED_BY(mu_);
intptr_t uuid_generator_ ABSL_GUARDED_BY(mu_) = 0;
};
} // namespace channelz

@ -23,6 +23,7 @@
#include <inttypes.h>
#include <atomic>
#include <chrono>
#include <memory>
#include <utility>
@ -86,10 +87,15 @@ namespace grpc_event_engine {
namespace experimental {
namespace {
// TODO(ctiller): grpc_core::Timestamp, Duration have very specific contracts
// around time caching and around when the underlying gpr_now call can be
// substituted out.
// We should probably move all usage here to std::chrono to avoid weird bugs in
// the future.
// Maximum amount of time an extra thread is allowed to idle before being
// reclaimed.
constexpr grpc_core::Duration kIdleThreadLimit =
grpc_core::Duration::Seconds(20);
constexpr auto kIdleThreadLimit = std::chrono::seconds(20);
// Rate at which "Waiting for ..." logs should be printed while quiescing.
constexpr size_t kBlockingQuiesceLogRateSeconds = 3;
// Minumum time between thread creations.
@ -424,7 +430,7 @@ bool WorkStealingThreadPool::ThreadState::Step() {
// * the global queue is empty
// * the steal pool returns nullptr
bool should_run_again = false;
grpc_core::Timestamp start_time{grpc_core::Timestamp::Now()};
auto start_time = std::chrono::steady_clock::now();
// Wait until work is available or until shut down.
while (!pool_->IsForking()) {
// Pull from the global queue next
@ -452,7 +458,7 @@ bool WorkStealingThreadPool::ThreadState::Step() {
// has been idle long enough.
if (timed_out &&
pool_->living_thread_count()->count() > pool_->reserve_threads() &&
grpc_core::Timestamp::Now() - start_time > kIdleThreadLimit) {
std::chrono::steady_clock::now() - start_time > kIdleThreadLimit) {
return false;
}
}

@ -96,6 +96,11 @@ const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_work_serializer_dispatch =
"Have the work serializer dispatch work to event engine for every "
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_lazier_stream_updates =
"Allow streams to consume up to 50% of the incoming window before we force "
"send a flow control update.";
@ -166,6 +171,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_keepalive_fix, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, false, false},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"lazier_stream_updates", description_lazier_stream_updates,
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
@ -257,6 +264,11 @@ const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_work_serializer_dispatch =
"Have the work serializer dispatch work to event engine for every "
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_lazier_stream_updates =
"Allow streams to consume up to 50% of the incoming window before we force "
"send a flow control update.";
@ -327,6 +339,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_keepalive_fix, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, false, false},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"lazier_stream_updates", description_lazier_stream_updates,
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
@ -418,6 +432,11 @@ const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_work_serializer_dispatch =
"Have the work serializer dispatch work to event engine for every "
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_lazier_stream_updates =
"Allow streams to consume up to 50% of the incoming window before we force "
"send a flow control update.";
@ -488,6 +507,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_keepalive_fix, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, false, false},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"lazier_stream_updates", description_lazier_stream_updates,
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,

@ -83,6 +83,7 @@ inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsUniqueMetadataStringsEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
@ -119,6 +120,7 @@ inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsUniqueMetadataStringsEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
@ -155,6 +157,7 @@ inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsUniqueMetadataStringsEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
@ -216,13 +219,17 @@ inline bool IsUniqueMetadataStringsEnabled() { return IsExperimentEnabled(18); }
inline bool IsKeepaliveFixEnabled() { return IsExperimentEnabled(19); }
#define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_SERVER_FIX
inline bool IsKeepaliveServerFixEnabled() { return IsExperimentEnabled(20); }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH
inline bool IsWorkSerializerDispatchEnabled() {
return IsExperimentEnabled(21);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(21); }
inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(22); }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); }
inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(23); }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() {
return IsExperimentEnabled(23);
return IsExperimentEnabled(24);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_CLIENT_CHANNEL_SUBCHANNEL_WRAPPER_WORK_SERIALIZER_ORPHAN
inline bool IsClientChannelSubchannelWrapperWorkSerializerOrphanEnabled() {

@ -81,7 +81,9 @@
(ie when all filters in a stack are promise based)
expiry: 2023/11/01
owner: ctiller@google.com
test_tags: ["core_end2end_test", "lame_client_test"]
# TODO(ctiller): re-enable once we've got some more CI bandwidth
# test_tags: ["core_end2end_test", "lame_client_test"]
test_tags: []
- name: free_large_allocator
description: If set, return all free bytes from a "big" allocator
expiry: 2023/11/01
@ -169,6 +171,14 @@
owner: yashkt@google.com
test_tags: []
allow_in_fuzzing_config: false
- name: work_serializer_dispatch
description:
Have the work serializer dispatch work to event engine for every callback,
instead of running things inline in the first thread that successfully
enqueues work.
expiry: 2024/02/10
owner: ctiller@google.com
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "lb_unit_test"]
- name: lazier_stream_updates
description:
Allow streams to consume up to 50% of the incoming window before we

@ -78,6 +78,8 @@
windows: broken
- name: work_stealing
default: true
- name: work_serializer_dispatch
default: false
- name: client_privacy
default: false
- name: canary_client_privacy

@ -20,18 +20,25 @@
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include <utility>
#include "absl/container/inlined_vector.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
@ -43,13 +50,32 @@ DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
class WorkSerializer::WorkSerializerImpl : public Orphanable {
public:
void Run(std::function<void()> callback, const DebugLocation& location);
void Schedule(std::function<void()> callback, const DebugLocation& location);
void DrainQueue();
virtual void Run(std::function<void()> callback,
const DebugLocation& location) = 0;
virtual void Schedule(std::function<void()> callback,
const DebugLocation& location) = 0;
virtual void DrainQueue() = 0;
#ifndef NDEBUG
virtual bool RunningInWorkSerializer() const = 0;
#endif
};
//
// WorkSerializer::LegacyWorkSerializer
//
class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
public:
void Run(std::function<void()> callback,
const DebugLocation& location) override;
void Schedule(std::function<void()> callback,
const DebugLocation& location) override;
void DrainQueue() override;
void Orphan() override;
#ifndef NDEBUG
bool RunningInWorkSerializer() const {
bool RunningInWorkSerializer() const override {
return std::this_thread::get_id() == current_thread_;
}
#endif
@ -89,6 +115,14 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
}
#ifndef NDEBUG
void SetCurrentThread() { current_thread_ = std::this_thread::get_id(); }
void ClearCurrentThread() { current_thread_ = std::thread::id(); }
#else
void SetCurrentThread() {}
void ClearCurrentThread() {}
#endif
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
@ -98,8 +132,8 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
#endif
};
void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
const DebugLocation& location) {
void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
const DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
@ -112,9 +146,7 @@ void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
if (GetOwners(prev_ref_pair) == 0) {
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
SetCurrentThread();
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
@ -137,7 +169,7 @@ void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
}
}
void WorkSerializer::WorkSerializerImpl::Schedule(
void WorkSerializer::LegacyWorkSerializer::Schedule(
std::function<void()> callback, const DebugLocation& location) {
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
@ -150,7 +182,7 @@ void WorkSerializer::WorkSerializerImpl::Schedule(
queue_.Push(&cb_wrapper->mpscq_node);
}
void WorkSerializer::WorkSerializerImpl::Orphan() {
void WorkSerializer::LegacyWorkSerializer::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
@ -166,7 +198,7 @@ void WorkSerializer::WorkSerializerImpl::Orphan() {
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callbacks.
void WorkSerializer::WorkSerializerImpl::DrainQueue() {
void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
@ -175,9 +207,7 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
const uint64_t prev_ref_pair =
refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
if (GetOwners(prev_ref_pair) == 0) {
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
SetCurrentThread();
// We took ownership of the WorkSerializer. Drain the queue.
DrainQueueOwned();
} else {
@ -189,7 +219,7 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
}
}
void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
}
@ -206,12 +236,10 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
}
if (GetSize(prev_ref_pair) == 2) {
// Queue drained. Give up ownership but only if queue remains empty.
#ifndef NDEBUG
// Reset current_thread_ before giving up ownership to avoid TSAN
// race. If we don't wind up giving up ownership, we'll set this
// again below before we pull the next callback out of the queue.
current_thread_ = std::thread::id();
#endif
ClearCurrentThread();
uint64_t expected = MakeRefPair(1, 1);
if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
std::memory_order_acq_rel)) {
@ -226,10 +254,8 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
delete this;
return;
}
#ifndef NDEBUG
// Didn't wind up giving up ownership, so set current_thread_ again.
current_thread_ = std::this_thread::get_id();
#endif
SetCurrentThread();
}
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
@ -253,14 +279,232 @@ void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
}
}
//
// WorkSerializer::DispatchingWorkSerializer
//
// DispatchingWorkSerializer: executes callbacks one at a time on EventEngine.
// One at a time guarantees that fixed size thread pools in EventEngine
// implementations are not starved of threads by long running work serializers.
// We implement EventEngine::Closure directly to avoid allocating once per
// callback in the queue when scheduling.
class WorkSerializer::DispatchingWorkSerializer final
: public WorkSerializerImpl,
public grpc_event_engine::experimental::EventEngine::Closure {
public:
explicit DispatchingWorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: event_engine_(std::move(event_engine)) {}
void Run(std::function<void()> callback,
const DebugLocation& location) override;
void Schedule(std::function<void()> callback,
const DebugLocation& location) override {
// We always dispatch to event engine, so Schedule and Run share semantics.
Run(callback, location);
}
void DrainQueue() override {}
void Orphan() override;
// Override EventEngine::Closure
void Run() override;
#ifndef NDEBUG
bool RunningInWorkSerializer() const override {
return running_work_serializer_ == this;
}
#endif
private:
// Wrapper to capture DebugLocation for the callback.
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
: callback(std::move(cb)), location(loc) {}
std::function<void()> callback;
// GPR_NO_UNIQUE_ADDRESS means this is 0 sized in release builds.
GPR_NO_UNIQUE_ADDRESS DebugLocation location;
};
using CallbackVector = absl::InlinedVector<CallbackWrapper, 1>;
// Refill processing_ from incoming_
// If processing_ is empty, also update running_ and return false.
// If additionally orphaned, will also delete this (therefore, it's not safe
// to touch any member variables if Refill returns false).
bool Refill();
// Perform the parts of Refill that need to acquire mu_
// Returns a tri-state indicating whether we were refilled successfully (=>
// keep running), or finished, and then if we were orphaned.
enum class RefillResult { kRefilled, kFinished, kFinishedAndOrphaned };
RefillResult RefillInner();
#ifndef NDEBUG
void SetCurrentThread() { running_work_serializer_ = this; }
void ClearCurrentThread() { running_work_serializer_ = nullptr; }
#else
void SetCurrentThread() {}
void ClearCurrentThread() {}
#endif
// Member variables are roughly sorted to keep processing cache lines
// separated from incoming cache lines.
// Callbacks that are currently being processed.
// Only accessed by: a Run() call going from not-running to running, or a work
// item being executed in EventEngine -- ie this does not need a mutex because
// all access is serialized.
// Stored in reverse execution order so that callbacks can be `pop_back()`'d
// on completion to free up any resources they hold.
CallbackVector processing_;
// EventEngine instance upon which we'll do our work.
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
// Flags containing run state:
// - running_ goes from false->true whenever the first callback is scheduled
// on an idle WorkSerializer, and transitions back to false after the last
// callback scheduled is completed and the WorkSerializer is again idle.
// - orphaned_ transitions to true once upon Orphan being called.
// When orphaned_ is true and running_ is false, the DispatchingWorkSerializer
// instance is deleted.
bool running_ ABSL_GUARDED_BY(mu_) = false;
bool orphaned_ ABSL_GUARDED_BY(mu_) = false;
Mutex mu_;
// Queued callbacks. New work items land here, and when processing_ is drained
// we move this entire queue into processing_ and work on draining it again.
// In low traffic scenarios this gives two mutex acquisitions per work item,
// but as load increases we get some natural batching and the rate of mutex
// acquisitions per work item tends towards 1.
CallbackVector incoming_ ABSL_GUARDED_BY(mu_);
#ifndef NDEBUG
static thread_local DispatchingWorkSerializer* running_work_serializer_;
#endif
};
#ifndef NDEBUG
thread_local WorkSerializer::DispatchingWorkSerializer*
WorkSerializer::DispatchingWorkSerializer::running_work_serializer_ =
nullptr;
#endif
void WorkSerializer::DispatchingWorkSerializer::Orphan() {
ReleasableMutexLock lock(&mu_);
// If we're not running, then we can delete immediately.
if (!running_) {
lock.Release();
delete this;
return;
}
// Otherwise store a flag to delete when we're done.
orphaned_ = true;
}
// Implementation of WorkSerializerImpl::Run
void WorkSerializer::DispatchingWorkSerializer::Run(
std::function<void()> callback, const DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer[%p] Scheduling callback [%s:%d]", this,
location.file(), location.line());
}
MutexLock lock(&mu_);
if (!running_) {
// If we were previously idle, insert this callback directly into the empty
// processing_ list and start running.
running_ = true;
GPR_ASSERT(processing_.empty());
processing_.emplace_back(std::move(callback), location);
event_engine_->Run(this);
} else {
// We are already running, so add this callback to the incoming_ list.
// The work loop will eventually get to it.
incoming_.emplace_back(std::move(callback), location);
}
}
// Implementation of EventEngine::Closure::Run - our actual work loop
void WorkSerializer::DispatchingWorkSerializer::Run() {
// TODO(ctiller): remove these when we can deprecate ExecCtx
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
// Grab the last element of processing_ - which is the next item in our queue
// since processing_ is stored in reverse order.
auto& cb = processing_.back();
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer[%p] Executing callback [%s:%d]", this,
cb.location.file(), cb.location.line());
}
// Run the work item.
SetCurrentThread();
cb.callback();
// pop_back here destroys the callback - freeing any resources it might hold.
// We do so before clearing the current thread in case the callback destructor
// wants to check that it's in the WorkSerializer too.
processing_.pop_back();
ClearCurrentThread();
// Check if we've drained the queue and if so refill it.
if (processing_.empty() && !Refill()) return;
// There's still work in processing_, so schedule ourselves again on
// EventEngine.
event_engine_->Run(this);
}
WorkSerializer::DispatchingWorkSerializer::RefillResult
WorkSerializer::DispatchingWorkSerializer::RefillInner() {
// Recover any memory held by processing_, so that we don't grow forever.
// Do so before acquiring a lock so we don't cause inadvertent contention.
processing_.shrink_to_fit();
MutexLock lock(&mu_);
// Swap incoming_ into processing_ - effectively lets us release memory
// (outside the lock) once per iteration for the storage vectors.
processing_.swap(incoming_);
// If there were no items, then we've finished running.
if (processing_.empty()) {
running_ = false;
// And if we're also orphaned then it's time to delete this object.
if (orphaned_) {
return RefillResult::kFinishedAndOrphaned;
} else {
return RefillResult::kFinished;
}
}
return RefillResult::kRefilled;
}
bool WorkSerializer::DispatchingWorkSerializer::Refill() {
const auto result = RefillInner();
switch (result) {
case RefillResult::kRefilled:
// Reverse processing_ so that we can pop_back() items in the correct
// order. (note that this is mostly pointer swaps inside the
// std::function's, so should be relatively cheap even for longer lists).
// Do so here so we're outside of the RefillInner lock.
std::reverse(processing_.begin(), processing_.end());
return true;
case RefillResult::kFinished:
return false;
case RefillResult::kFinishedAndOrphaned:
// Orphaned and finished - finally delete this object.
// Here so that the mutex lock in RefillInner is released.
delete this;
return false;
}
GPR_UNREACHABLE_CODE(return false);
}
//
// WorkSerializer
//
WorkSerializer::WorkSerializer()
: impl_(MakeOrphanable<WorkSerializerImpl>()) {}
WorkSerializer::WorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
: impl_(IsWorkSerializerDispatchEnabled()
? OrphanablePtr<WorkSerializerImpl>(
MakeOrphanable<DispatchingWorkSerializer>(
std::move(event_engine)))
: OrphanablePtr<WorkSerializerImpl>(
MakeOrphanable<LegacyWorkSerializer>())) {}
WorkSerializer::~WorkSerializer() {}
WorkSerializer::~WorkSerializer() = default;
void WorkSerializer::Run(std::function<void()> callback,
const DebugLocation& location) {

@ -20,9 +20,12 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <memory>
#include "absl/base/thread_annotations.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -44,14 +47,28 @@ namespace grpc_core {
// invoke DrainQueue() when it is safe to invoke the callback.
class ABSL_LOCKABLE WorkSerializer {
public:
WorkSerializer();
explicit WorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine);
~WorkSerializer();
// Runs a given callback on the work serializer. If there is no other thread
// currently executing the WorkSerializer, the callback is run immediately. In
// this case, the current thread is also borrowed for draining the queue for
// any callbacks that get added in the meantime.
WorkSerializer(const WorkSerializer&) = delete;
WorkSerializer& operator=(const WorkSerializer&) = delete;
WorkSerializer(WorkSerializer&&) noexcept = default;
WorkSerializer& operator=(WorkSerializer&&) noexcept = default;
// Runs a given callback on the work serializer.
//
// If experiment `work_serializer_dispatch` is enabled:
// The callback will be executed as an EventEngine callback, that then
// arranges for the next callback in the queue to execute.
//
// If experiment `work_serializer_dispatch` is NOT enabled:
// If there is no other thread currently executing the WorkSerializer, the
// callback is run immediately. In this case, the current thread is also
// borrowed for draining the queue for any callbacks that get added in the
// meantime.
// This behavior is deprecated and will be removed soon.
//
// If you want to use clang thread annotation to make sure that callback is
// called by WorkSerializer only, you need to add the annotation to both the
@ -64,9 +81,6 @@ class ABSL_LOCKABLE WorkSerializer {
// }, DEBUG_LOCATION);
// }
// void callback() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer) { ... }
//
// TODO(yashkt): Replace DebugLocation with absl::SourceLocation
// once we can start using it directly.
void Run(std::function<void()> callback, const DebugLocation& location);
// Schedule \a callback to be run later when the queue of callbacks is
@ -82,6 +96,8 @@ class ABSL_LOCKABLE WorkSerializer {
private:
class WorkSerializerImpl;
class LegacyWorkSerializer;
class DispatchingWorkSerializer;
OrphanablePtr<WorkSerializerImpl> impl_;
};

@ -123,6 +123,7 @@ grpc_cc_test(
"//:grpc",
"//:grpc++",
"//src/core:channel_args",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/util:grpc_test_util",
"//test/cpp/util:channel_trace_proto_helper",
],

@ -22,13 +22,16 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <memory>
#include <ratio>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/channel_arg_names.h>
@ -38,15 +41,21 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_reader.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/channel_trace_proto_helper.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
using grpc_event_engine::experimental::WaitForSingleOwner;
namespace grpc_core {
namespace channelz {
namespace testing {
@ -339,9 +348,15 @@ TEST_P(ChannelzChannelTest, LastCallStartedTime) {
class ChannelzRegistryBasedTest : public ::testing::TestWithParam<size_t> {
protected:
// ensure we always have a fresh registry for tests.
void SetUp() override { ChannelzRegistry::TestOnlyReset(); }
void SetUp() override {
WaitForSingleOwner(GetDefaultEventEngine());
ChannelzRegistry::TestOnlyReset();
}
void TearDown() override { ChannelzRegistry::TestOnlyReset(); }
void TearDown() override {
WaitForSingleOwner(GetDefaultEventEngine());
ChannelzRegistry::TestOnlyReset();
}
};
TEST_F(ChannelzRegistryBasedTest, BasicGetTopChannelsTest) {
@ -499,19 +514,27 @@ TEST_F(ChannelzRegistryBasedTest, GetTopChannelsUuidAfterCompaction) {
even_channels.push_back(std::make_unique<ChannelFixture>());
}
}
std::string json_str = ChannelzRegistry::GetTopChannels(0);
auto parsed_json = JsonParse(json_str);
ASSERT_TRUE(parsed_json.ok()) << parsed_json.status();
ASSERT_EQ(parsed_json->type(), Json::Type::kObject);
Json channel_json;
auto it = parsed_json->object().find("channel");
if (it != parsed_json->object().end()) channel_json = it->second;
ValidateJsonArraySize(channel_json, kLoopIterations);
std::vector<intptr_t> uuids = GetUuidListFromArray(channel_json.array());
for (int i = 0; i < kLoopIterations; ++i) {
// only the even uuids will still be present.
EXPECT_EQ((i + 1) * 2, uuids[i]);
}
Notification done;
grpc_event_engine::experimental::GetDefaultEventEngine()->RunAfter(
std::chrono::seconds(5 * grpc_test_slowdown_factor()), [&] {
ExecCtx exec_ctx;
std::string json_str = ChannelzRegistry::GetTopChannels(0);
auto parsed_json = JsonParse(json_str);
ASSERT_TRUE(parsed_json.ok()) << parsed_json.status();
ASSERT_EQ(parsed_json->type(), Json::Type::kObject);
Json channel_json;
auto it = parsed_json->object().find("channel");
if (it != parsed_json->object().end()) channel_json = it->second;
ValidateJsonArraySize(channel_json, kLoopIterations);
std::vector<intptr_t> uuids =
GetUuidListFromArray(channel_json.array());
for (int i = 0; i < kLoopIterations; ++i) {
// only the even uuids will still be present.
EXPECT_EQ((i + 1) * 2, uuids[i]);
}
done.Notify();
});
done.WaitForNotification();
}
TEST_F(ChannelzRegistryBasedTest, InternalChannelTest) {

@ -34,7 +34,8 @@ grpc_cc_library(
deps = [
"//src/core:lb_policy",
"//src/core:subchannel_interface",
"//test/core/event_engine:mock_event_engine",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/fuzzing_event_engine",
],
)
@ -82,7 +83,9 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = ["no_test_ios"],
tags = [
"no_test_ios",
],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -117,7 +120,9 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = ["no_test_ios"],
tags = [
"no_test_ios",
],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -150,7 +155,9 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = ["no_test_ios"],
tags = [
"no_test_ios",
],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -201,7 +208,9 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = ["no_test_ios"],
tags = [
"no_test_ios",
],
uses_event_engine = False,
uses_polling = False,
deps = [

@ -20,7 +20,6 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <chrono>
@ -29,15 +28,14 @@
#include <initializer_list>
#include <map>
#include <memory>
#include <ratio>
#include <set>
#include <string>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
@ -86,7 +84,9 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/mock_event_engine.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
namespace grpc_core {
namespace testing {
@ -106,9 +106,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// given SubchannelState object.
class FakeSubchannel : public SubchannelInterface {
public:
FakeSubchannel(SubchannelState* state,
std::shared_ptr<WorkSerializer> work_serializer)
: state_(state), work_serializer_(std::move(work_serializer)) {}
explicit FakeSubchannel(SubchannelState* state) : state_(state) {}
~FakeSubchannel() override {
if (orca_watcher_ != nullptr) {
@ -153,27 +151,13 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
watcher()->OnConnectivityStateChange(new_state, status);
gpr_log(GPR_INFO, "notifying watcher: state=%s status=%s",
ConnectivityStateName(new_state), status.ToString().c_str());
watcher_->OnConnectivityStateChange(new_state, status);
}
private:
SubchannelInterface::ConnectivityStateWatcherInterface* watcher()
const {
return Match(
watcher_,
[](const std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); },
[](const std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); });
}
absl::variant<
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>,
std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>>
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
watcher_;
};
@ -181,10 +165,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
auto* watcher_ptr = watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
work_serializer_, std::move(watcher));
state_->work_serializer(), std::move(watcher));
watcher_map_[watcher_ptr] = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
@ -192,7 +176,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
auto it = watcher_map_.find(watcher);
if (it == watcher_map_.end()) return;
state_->state_tracker_.RemoveWatcher(it->second);
@ -204,8 +188,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
state_->requested_connection_ = true;
}
void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)
override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
void AddDataWatcher(
std::unique_ptr<DataWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
auto* w =
static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
@ -222,7 +207,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
auto connectivity_watcher = health_watcher_->TakeWatcher();
auto* connectivity_watcher_ptr = connectivity_watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
work_serializer_, std::move(connectivity_watcher));
state_->work_serializer(), std::move(connectivity_watcher));
health_watcher_wrapper_ = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
@ -235,7 +220,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
void CancelDataWatcher(DataWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
if (w->type() == OrcaProducer::Type()) {
@ -260,7 +245,6 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void ResetBackoff() override {}
SubchannelState* state_;
std::shared_ptr<WorkSerializer> work_serializer_;
std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
WatcherWrapper*>
watcher_map_;
@ -269,10 +253,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::unique_ptr<OrcaWatcher> orca_watcher_;
};
SubchannelState(absl::string_view address,
std::shared_ptr<WorkSerializer> work_serializer)
SubchannelState(absl::string_view address, LoadBalancingPolicyTest* test)
: address_(address),
work_serializer_(std::move(work_serializer)),
test_(test),
state_tracker_("LoadBalancingPolicyTest") {}
const std::string& address() const { return address_; }
@ -330,16 +313,44 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< "bug in test: " << ConnectivityStateName(state)
<< " must have OK status: " << status;
}
work_serializer_->Run(
[this, state, status, validate_state_transition, location]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) {
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(),
state, location);
}
state_tracker_.SetState(state, status, "set from test");
},
// Updating the state in the state tracker will enqueue
// notifications to watchers on the WorkSerializer. If any
// subchannel reports READY, the pick_first leaf policy will then
// start a health watch, whose initial notification will also be
// scheduled on the WorkSerializer. We don't want to return until
// all of those notifications have been delivered.
absl::Notification notification;
test_->work_serializer_->Run(
[&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*test_->work_serializer_) {
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(),
state, location);
}
gpr_log(GPR_INFO, "Setting state on tracker");
state_tracker_.SetState(state, status, "set from test");
// SetState() enqueued the connectivity state notifications for
// the subchannel, so we add another callback to the queue to be
// executed after that state notifications has been delivered.
gpr_log(GPR_INFO,
"Waiting for state notifications to be delivered");
test_->work_serializer_->Run(
[&]() {
gpr_log(GPR_INFO,
"State notifications delivered, waiting for health "
"notifications");
// Now the connectivity state notifications has been
// delivered. If the state reported was READY, then the
// pick_first leaf policy will have started a health watch, so
// we add another callback to the queue to be executed after
// the initial health watch notification has been delivered.
test_->work_serializer_->Run([&]() { notification.Notify(); },
DEBUG_LOCATION);
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
notification.WaitForNotification();
gpr_log(GPR_INFO, "Health notifications delivered");
}
// Indicates if any of the associated SubchannelInterface objects
@ -351,9 +362,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
// To be invoked by FakeHelper.
RefCountedPtr<SubchannelInterface> CreateSubchannel(
std::shared_ptr<WorkSerializer> work_serializer) {
return MakeRefCounted<FakeSubchannel>(this, std::move(work_serializer));
RefCountedPtr<SubchannelInterface> CreateSubchannel() {
return MakeRefCounted<FakeSubchannel>(this);
}
// Sends an OOB backend metric report to all watchers.
@ -374,10 +384,15 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
}
std::shared_ptr<WorkSerializer> work_serializer() {
return test_->work_serializer_;
}
private:
const std::string address_;
std::shared_ptr<WorkSerializer> work_serializer_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_);
LoadBalancingPolicyTest* const test_;
ConnectivityStateTracker state_tracker_
ABSL_GUARDED_BY(*test_->work_serializer_);
Mutex requested_connection_mu_;
bool requested_connection_ ABSL_GUARDED_BY(&requested_connection_mu_) =
@ -409,13 +424,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::string ToString() const { return "RERESOLUTION"; }
};
FakeHelper(LoadBalancingPolicyTest* test,
std::shared_ptr<WorkSerializer> work_serializer,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: test_(test),
work_serializer_(std::move(work_serializer)),
event_engine_(std::move(event_engine)) {}
explicit FakeHelper(LoadBalancingPolicyTest* test) : test_(test) {}
bool QueueEmpty() {
MutexLock lock(&mu_);
@ -473,6 +482,39 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
private:
// A wrapper for a picker that hops into the WorkSerializer to
// release the ref to the picker.
class PickerWrapper : public LoadBalancingPolicy::SubchannelPicker {
public:
PickerWrapper(LoadBalancingPolicyTest* test,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
: test_(test), picker_(std::move(picker)) {
gpr_log(GPR_INFO, "creating wrapper %p for picker %p", this,
picker_.get());
}
void Orphan() override {
absl::Notification notification;
test_->work_serializer_->Run(
[notification = &notification,
picker = std::move(picker_)]() mutable {
picker.reset();
notification->Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
}
LoadBalancingPolicy::PickResult Pick(
LoadBalancingPolicy::PickArgs args) override {
return picker_->Pick(args);
}
private:
LoadBalancingPolicyTest* const test_;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_;
};
// Represents an event reported by the LB policy.
using Event = absl::variant<StateUpdate, ReresolutionRequested>;
@ -502,18 +544,19 @@ class LoadBalancingPolicyTest : public ::testing::Test {
GPR_ASSERT(address_uri.ok());
it = test_->subchannel_pool_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(std::move(*address_uri),
work_serializer_))
std::forward_as_tuple(std::move(*address_uri), test_))
.first;
}
return it->second.CreateSubchannel(work_serializer_);
return it->second.CreateSubchannel();
}
void UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
MutexLock lock(&mu_);
StateUpdate update{state, status, std::move(picker)};
StateUpdate update{
state, status,
MakeRefCounted<PickerWrapper>(test_, std::move(picker))};
gpr_log(GPR_INFO, "state update from LB policy: %s",
update.ToString().c_str());
queue_.push_back(std::move(update));
@ -536,14 +579,12 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return event_engine_.get();
return test_->fuzzing_ee_.get();
}
void AddTraceEvent(TraceSeverity, absl::string_view) override {}
LoadBalancingPolicyTest* test_;
std::shared_ptr<WorkSerializer> work_serializer_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
Mutex mu_;
std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
@ -633,10 +674,36 @@ class LoadBalancingPolicyTest : public ::testing::Test {
const absl::optional<BackendMetricData> backend_metric_data_;
};
LoadBalancingPolicyTest()
: work_serializer_(std::make_shared<WorkSerializer>()) {}
explicit LoadBalancingPolicyTest(absl::string_view lb_policy_name)
: lb_policy_name_(lb_policy_name) {}
void SetUp() override {
// Order is important here: Fuzzing EE needs to be created before
// grpc_init(), and the POSIX EE (which is used by the WorkSerializer)
// needs to be created after grpc_init().
fuzzing_ee_ =
std::make_shared<grpc_event_engine::experimental::FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
fuzzing_event_engine::Actions());
grpc_init();
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
work_serializer_ = std::make_shared<WorkSerializer>(event_engine_);
auto helper = std::make_unique<FakeHelper>(this);
helper_ = helper.get();
LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper),
ChannelArgs()};
lb_policy_ =
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
lb_policy_name_, std::move(args));
GPR_ASSERT(lb_policy_ != nullptr);
}
void TearDown() override {
fuzzing_ee_->FuzzingDone();
// Make sure pickers (and transitively, subchannels) are unreffed before
// destroying the fixture.
WaitForWorkSerializerToFlush();
work_serializer_.reset();
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
@ -644,20 +711,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// (This will cause an ASAN failure, but it will not display the
// queued events, so the failure will be harder to diagnose.)
helper_->ExpectQueueEmpty();
lb_policy_.reset();
fuzzing_ee_->TickUntilIdle();
grpc_event_engine::experimental::WaitForSingleOwner(
std::move(event_engine_));
event_engine_.reset();
grpc_shutdown_blocking();
fuzzing_ee_.reset();
}
// Creates an LB policy of the specified name.
// Creates a new FakeHelper for the new LB policy, and sets helper_ to
// point to the FakeHelper.
OrphanablePtr<LoadBalancingPolicy> MakeLbPolicy(absl::string_view name) {
auto helper =
std::make_unique<FakeHelper>(this, work_serializer_, event_engine_);
helper_ = helper.get();
LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper),
ChannelArgs()};
return CoreConfiguration::Get()
.lb_policy_registry()
.CreateLoadBalancingPolicy(name, std::move(args));
LoadBalancingPolicy* lb_policy() const {
GPR_ASSERT(lb_policy_ != nullptr);
return lb_policy_.get();
}
// Creates an LB policy config from json.
@ -699,14 +764,41 @@ class LoadBalancingPolicyTest : public ::testing::Test {
LoadBalancingPolicy* lb_policy) {
ExecCtx exec_ctx;
absl::Status status;
// When the LB policy gets the update, it will create new
// subchannels, and it will register connectivity state watchers and
// optionally health watchers for each one. We don't want to return
// until all the initial notifications for all of those watchers
// have been delivered to the LB policy.
absl::Notification notification;
work_serializer_->Run(
[&]() {
status = lb_policy->UpdateLocked(std::move(update_args));
notification.Notify();
// UpdateLocked() enqueued the initial connectivity state
// notifications for the subchannels, so we add another
// callback to the queue to be executed after those initial
// state notifications have been delivered.
gpr_log(GPR_INFO,
"Applied update, waiting for initial connectivity state "
"notifications");
work_serializer_->Run(
[&]() {
gpr_log(GPR_INFO,
"Initial connectivity state notifications delivered; "
"waiting for health notifications");
// Now that the initial state notifications have been
// delivered, the queue will contain the health watch
// notifications for any subchannels in state READY,
// so we add another callback to the queue to be
// executed after those health watch notifications have
// been delivered.
work_serializer_->Run([&]() { notification.Notify(); },
DEBUG_LOCATION);
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
notification.WaitForNotification();
gpr_log(GPR_INFO, "health notifications delivered");
return status;
}
@ -859,6 +951,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return false; // Stop.
},
location);
gpr_log(GPR_INFO, "done waiting for expected RR addresses");
return retval;
}
@ -1015,18 +1108,30 @@ class LoadBalancingPolicyTest : public ::testing::Test {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectRoundRobinStartup(
absl::Span<const absl::string_view> addresses) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
// RR should ask each subchannel to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Expect the initial CONNECTNG update with a picker that queues.
if (i == 0) ExpectConnectingUpdate();
// The connection attempts succeed.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
// When the first subchannel becomes READY, accept any number of
// CONNECTING updates with a picker that queues followed by a READY
// update with a picker that repeatedly returns only the first address.
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
// When each subsequent subchannel becomes READY, we accept any number
// of READY updates where the picker returns only the previously
// connected subchannel(s) followed by a READY update where the picker
// returns the previously connected subchannel(s) *and* the newly
// connected subchannel.
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
@ -1091,87 +1196,57 @@ class LoadBalancingPolicyTest : public ::testing::Test {
SubchannelKey key(MakeAddress(address), args);
auto it = subchannel_pool_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(address, work_serializer_))
std::forward_as_tuple(address, this))
.first;
return &it->second;
}
std::shared_ptr<WorkSerializer> work_serializer_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
grpc_event_engine::experimental::GetDefaultEventEngine();
FakeHelper* helper_ = nullptr;
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
};
// A subclass to be used for LB policies that start timers.
// Injects a mock EventEngine and provides the necessary framework for
// incrementing time and handling timer callbacks.
class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
protected:
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}
Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}
void IncrementBy(Duration duration) { cached_time_ += duration; }
private:
Timestamp cached_time_;
};
TimeAwareLoadBalancingPolicyTest() {
auto mock_ee =
std::make_shared<grpc_event_engine::experimental::MockEventEngine>();
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
CheckExpectedTimerDuration(duration);
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel =
[this](
grpc_event_engine::experimental::EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel);
// Store in base class, to make it visible to the LB policy.
event_engine_ = std::move(mock_ee);
void WaitForWorkSerializerToFlush() {
gpr_log(GPR_INFO, "waiting for WorkSerializer to flush...");
absl::Notification notification;
work_serializer_->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
notification.WaitForNotification();
gpr_log(GPR_INFO, "WorkSerializer flush complete");
}
~TimeAwareLoadBalancingPolicyTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
void IncrementTimeBy(Duration duration) {
fuzzing_ee_->TickForDuration(duration);
// Flush WorkSerializer, in case the timer callback enqueued anything.
WaitForWorkSerializerToFlush();
}
void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
void SetExpectedTimerDuration(
absl::optional<grpc_event_engine::experimental::EventEngine::Duration>
duration) {
if (duration.has_value()) {
fuzzing_ee_->SetRunAfterDurationCallback(
[expected = *duration](
grpc_event_engine::experimental::EventEngine::Duration duration) {
EXPECT_EQ(duration, expected)
<< "Expected: " << expected.count()
<< "ns\nActual: " << duration.count() << "ns";
});
} else {
fuzzing_ee_->SetRunAfterDurationCallback(nullptr);
}
}
// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration) {}
std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
TestTimeCache time_cache_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
fuzzing_ee_;
// TODO(ctiller): this is a normal event engine, yet it gets its time measure
// from fuzzing_ee_ -- results are likely to be a little funky, but seem to do
// well enough for the tests we have today.
// We should transition everything here to just use fuzzing_ee_, but that
// needs some thought on how to Tick() at appropriate times, as there are
// Notification objects buried everywhere in this code, and
// WaitForNotification is deeply incompatible with a single threaded event
// engine that doesn't run callbacks until its public Tick method is called.
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
std::shared_ptr<WorkSerializer> work_serializer_;
FakeHelper* helper_ = nullptr;
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
const absl::string_view lb_policy_name_;
};
} // namespace testing

@ -21,7 +21,6 @@
#include <array>
#include <chrono>
#include <memory>
#include <ratio>
#include <string>
#include <utility>
#include <vector>
@ -31,14 +30,12 @@
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
@ -50,7 +47,7 @@ namespace grpc_core {
namespace testing {
namespace {
class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
class OutlierDetectionTest : public LoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
@ -146,7 +143,12 @@ class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
};
OutlierDetectionTest()
: lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {}
: LoadBalancingPolicyTest("outlier_detection_experimental") {}
void SetUp() override {
LoadBalancingPolicyTest::SetUp();
SetExpectedTimerDuration(std::chrono::seconds(10));
}
absl::optional<std::string> DoPickWithFailedCall(
LoadBalancingPolicy::SubchannelPicker* picker) {
@ -164,25 +166,13 @@ class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
}
return address;
}
void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_internal_)
<< "Expected: " << expected_internal_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration expected_internal_ =
std::chrono::seconds(10);
};
TEST_F(OutlierDetectionTest, Basic) {
constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443";
// Send an update containing one address.
absl::Status status = ApplyUpdate(
BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy_.get());
BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the address.
auto* subchannel = FindSubchannel(kAddressUri);
@ -216,7 +206,7 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.Build()),
lb_policy_.get());
lb_policy());
EXPECT_TRUE(status.ok()) << status;
// Expect normal startup.
auto picker = ExpectRoundRobinStartup(kAddresses);
@ -227,8 +217,7 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
IncrementTimeBy(Duration::Seconds(10));
gpr_log(GPR_INFO, "### ejection complete");
if (!IsRoundRobinDelegateToPickFirstEnabled()) ExpectReresolutionRequest();
// Expect a picker update.
@ -251,7 +240,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
.SetFailurePercentageRequestVolume(1)
.SetChildPolicy({{"pick_first", Json::FromObject({})}})
.Build()),
lb_policy_.get());
lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the first address.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -279,8 +268,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
IncrementTimeBy(Duration::Seconds(10));
gpr_log(GPR_INFO, "### ejection timer pass complete");
// Subchannel should not be ejected.
ExpectQueueEmpty();
@ -295,8 +283,5 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
return RUN_ALL_TESTS();
}

@ -25,6 +25,7 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gmock/gmock.h"
@ -34,7 +35,6 @@
#include <grpc/support/json.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/json/json.h"
@ -48,7 +48,7 @@ namespace {
class PickFirstTest : public LoadBalancingPolicyTest {
protected:
PickFirstTest() : lb_policy_(MakeLbPolicy("pick_first")) {}
PickFirstTest() : LoadBalancingPolicyTest("pick_first") {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakePickFirstConfig(
absl::optional<bool> shuffle_address_list = absl::nullopt) {
@ -65,9 +65,19 @@ class PickFirstTest : public LoadBalancingPolicyTest {
void GetOrderAddressesArePicked(
absl::Span<const absl::string_view> addresses,
std::vector<absl::string_view>* out_address_order) {
work_serializer_->Run([&]() { lb_policy_->ExitIdleLocked(); },
DEBUG_LOCATION);
out_address_order->clear();
// Note: ExitIdle() will enqueue a bunch of connectivity state
// notifications on the WorkSerializer, and we want to wait until
// those are delivered to the LB policy.
absl::Notification notification;
work_serializer_->Run(
[&]() {
lb_policy()->ExitIdleLocked();
work_serializer_->Run([&]() { notification.Notify(); },
DEBUG_LOCATION);
},
DEBUG_LOCATION);
notification.WaitForNotification();
// Construct a map of subchannel to address.
// We will remove entries as each subchannel starts to connect.
std::map<SubchannelState*, absl::string_view> subchannels;
@ -122,8 +132,6 @@ class PickFirstTest : public LoadBalancingPolicyTest {
subchannels.erase(subchannel);
}
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
};
TEST_F(PickFirstTest, FirstAddressWorks) {
@ -131,7 +139,7 @@ TEST_F(PickFirstTest, FirstAddressWorks) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -164,7 +172,7 @@ TEST_F(PickFirstTest, FirstAddressFails) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -216,7 +224,7 @@ TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) {
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for all addresses.
auto* subchannel3 = FindSubchannel(kAddresses[2]);
@ -259,7 +267,7 @@ TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
@ -305,7 +313,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
@ -324,7 +332,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
constexpr std::array<absl::string_view, 2> kAddresses2 = {
kAddresses[0], "ipv4:127.0.0.1:445"};
status = ApplyUpdate(BuildUpdate(kAddresses2, MakePickFirstConfig(false)),
lb_policy_.get());
lb_policy());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should have created a subchannel for the new address.
auto* subchannel3 = FindSubchannel(kAddresses2[1]);
@ -349,7 +357,7 @@ TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -410,7 +418,7 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -445,6 +453,13 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
// By checking the picker, we told the LB policy to trigger a new
// connection attempt, so it should start over with the first
// subchannel.
// Note that the picker will have enqueued the ExitIdle() call in the
// WorkSerializer, so the first flush will execute that call. But
// executing that call will result in enqueueing subchannel
// connectivity state notifications, so we need to flush again to make
// sure all of that work is done before we continue.
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel starts connecting.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
@ -473,7 +488,7 @@ TEST_F(PickFirstTest, WithShuffle) {
bool shuffled = false;
for (size_t i = 0; i < kMaxTries; ++i) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
GetOrderAddressesArePicked(kAddresses, &addresses_after_update);
if (absl::MakeConstSpan(addresses_after_update) !=
@ -496,7 +511,7 @@ TEST_F(PickFirstTest, ShufflingDisabled) {
constexpr static size_t kMaxAttempts = 5;
for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
EXPECT_TRUE(status.ok()) << status;
std::vector<absl::string_view> address_order;
GetOrderAddressesArePicked(kAddresses, &address_order);
@ -510,8 +525,5 @@ TEST_F(PickFirstTest, ShufflingDisabled) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
return RUN_ALL_TESTS();
}

@ -14,8 +14,6 @@
// limitations under the License.
//
#include <stddef.h>
#include <array>
#include "absl/status/status.h"
@ -23,11 +21,6 @@
#include "absl/types/span.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
@ -37,68 +30,33 @@ namespace {
class RoundRobinTest : public LoadBalancingPolicyTest {
protected:
RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {}
void ExpectStartup(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, nullptr), lb_policy_.get()),
absl::OkStatus());
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
ASSERT_NE(subchannel, nullptr) << "Address: " << addresses[i];
// RR should ask each subchannel to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel will connect successfully.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Expect the initial CONNECTNG update with a picker that queues.
if (i == 0) ExpectConnectingUpdate();
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// As each subchannel becomes READY, we should get a new picker that
// includes the behavior. Note that there may be any number of
// duplicate updates for the previous state in the queue before the
// update that we actually want to see.
if (i == 0) {
// When the first subchannel becomes READY, accept any number of
// CONNECTING updates with a picker that queues followed by a READY
// update with a picker that repeatedly returns only the first address.
auto picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
// When each subsequent subchannel becomes READY, we accept any number
// of READY updates where the picker returns only the previously
// connected subchannel(s) followed by a READY update where the picker
// returns the previously connected subchannel(s) *and* the newly
// connected subchannel.
WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
RoundRobinTest() : LoadBalancingPolicyTest("round_robin") {}
};
TEST_F(RoundRobinTest, Basic) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ExpectStartup(kAddresses);
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, nullptr), lb_policy()),
absl::OkStatus());
ExpectRoundRobinStartup(kAddresses);
}
TEST_F(RoundRobinTest, AddressUpdates) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
ExpectStartup(kAddresses);
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, nullptr), lb_policy()),
absl::OkStatus());
ExpectRoundRobinStartup(kAddresses);
// Send update to remove address 2.
EXPECT_EQ(
ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2), nullptr),
lb_policy_.get()),
lb_policy()),
absl::OkStatus());
WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2));
// Send update to remove address 0 and re-add address 2.
EXPECT_EQ(
ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2), nullptr),
lb_policy_.get()),
lb_policy()),
absl::OkStatus());
WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2),
absl::MakeSpan(kAddresses).last(2));
@ -115,8 +73,5 @@ TEST_F(RoundRobinTest, AddressUpdates) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
return RUN_ALL_TESTS();
}

@ -22,7 +22,6 @@
#include <chrono>
#include <map>
#include <memory>
#include <ratio>
#include <string>
#include <utility>
#include <vector>
@ -36,14 +35,12 @@
#include "absl/types/span.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
@ -67,7 +64,7 @@ BackendMetricData MakeBackendMetricData(double app_utilization, double qps,
return b;
}
class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
@ -113,8 +110,11 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
Json::Object json_;
};
WeightedRoundRobinTest() {
lb_policy_ = MakeLbPolicy("weighted_round_robin");
WeightedRoundRobinTest() : LoadBalancingPolicyTest("weighted_round_robin") {}
void SetUp() override {
LoadBalancingPolicyTest::SetUp();
SetExpectedTimerDuration(std::chrono::seconds(1));
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
@ -125,7 +125,7 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
SourceLocation location = SourceLocation()) {
if (update_addresses.empty()) update_addresses = addresses;
EXPECT_EQ(ApplyUpdate(BuildUpdate(update_addresses, config_builder.Build()),
lb_policy_.get()),
lb_policy()),
absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate(location);
@ -311,24 +311,11 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
if (*picker == nullptr) return false;
} else if (run_timer_callbacks) {
gpr_log(GPR_INFO, "running timer callback...");
RunTimerCallback();
// Increment time and run any timer callbacks.
IncrementTimeBy(Duration::Seconds(1));
}
// Increment time.
time_cache_.IncrementBy(Duration::Seconds(1));
}
}
void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_weight_update_interval_)
<< "Expected: " << expected_weight_update_interval_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration
expected_weight_update_interval_ = std::chrono::seconds(1);
};
TEST_F(WeightedRoundRobinTest, Basic) {
@ -643,7 +630,7 @@ TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
expected_weight_update_interval_ = std::chrono::seconds(2);
SetExpectedTimerDuration(std::chrono::seconds(2));
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Seconds(2)));
ASSERT_NE(picker, nullptr);
@ -661,7 +648,7 @@ TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
expected_weight_update_interval_ = std::chrono::milliseconds(100);
SetExpectedTimerDuration(std::chrono::milliseconds(100));
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses,
ConfigBuilder().SetWeightUpdatePeriod(Duration::Milliseconds(10)));
@ -697,8 +684,7 @@ TEST_F(WeightedRoundRobinTest, WeightExpirationPeriod) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time to make weights stale and trigger the timer callback
// to recompute weights.
time_cache_.IncrementBy(Duration::Seconds(2));
RunTimerCallback();
IncrementTimeBy(Duration::Seconds(2));
// Picker should now be falling back to round-robin.
ExpectWeightedRoundRobinPicks(
picker.get(), {},
@ -725,8 +711,7 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time to make weights stale and trigger the timer callback
// to recompute weights.
time_cache_.IncrementBy(Duration::Seconds(2));
RunTimerCallback();
IncrementTimeBy(Duration::Seconds(2));
// Picker should now be falling back to round-robin.
ExpectWeightedRoundRobinPicks(
picker.get(), {},
@ -744,8 +729,7 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
{{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time past the blackout period. This should cause the
// weights to be used.
time_cache_.IncrementBy(Duration::Seconds(1));
RunTimerCallback();
IncrementTimeBy(Duration::Seconds(1));
ExpectWeightedRoundRobinPicks(
picker.get(), {},
{{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 1}});
@ -791,8 +775,7 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Advance time to exceed the blackout period and trigger the timer
// callback to recompute weights.
time_cache_.IncrementBy(Duration::Seconds(1));
RunTimerCallback();
IncrementTimeBy(Duration::Seconds(1));
ExpectWeightedRoundRobinPicks(
picker.get(),
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
@ -824,9 +807,9 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodDoesNotGetResetAfterUpdate) {
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Send a duplicate update with the same addresses and config.
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()),
lb_policy_.get()),
absl::OkStatus());
EXPECT_EQ(
ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()), lb_policy()),
absl::OkStatus());
// Note that we have not advanced time, so if the update incorrectly
// triggers resetting the blackout period, none of the weights will
// actually be used.
@ -869,8 +852,5 @@ TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
return RUN_ALL_TESTS();
}

@ -34,7 +34,6 @@
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
@ -48,7 +47,7 @@ namespace {
class XdsOverrideHostTest : public LoadBalancingPolicyTest {
protected:
XdsOverrideHostTest()
: policy_(MakeLbPolicy("xds_override_host_experimental")) {}
: LoadBalancingPolicyTest("xds_override_host_experimental") {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
@ -72,7 +71,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()),
policy_.get()),
lb_policy()),
absl::OkStatus());
return ExpectRoundRobinStartup(addresses);
}
@ -96,7 +95,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
update.addresses->push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus());
EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus());
}
CallAttributes MakeOverrideHostAttribute(absl::string_view host) {
@ -105,8 +104,6 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
std::make_unique<XdsOverrideHostAttribute>(host));
return override_host_attributes;
}
OrphanablePtr<LoadBalancingPolicy> policy_;
};
TEST_F(XdsOverrideHostTest, DelegatesToChild) {
@ -118,7 +115,7 @@ TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
EXPECT_EQ(
ApplyUpdate(
BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
policy_.get()),
lb_policy()),
absl::InvalidArgumentError("Missing policy config"));
}
@ -164,7 +161,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
// Some other address is gone
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[1]},
MakeXdsOverrideHostConfig()),
policy_.get()),
lb_policy()),
absl::OkStatus());
// Wait for LB policy to return a new picker that uses the updated
// addresses. We can't use the host override for this, because then
@ -178,7 +175,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
// "Our" address is gone so others get returned in round-robin order
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[2]},
MakeXdsOverrideHostConfig()),
policy_.get()),
lb_policy()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
// In this case, we can pass call_attributes while we wait instead of
@ -190,7 +187,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
// And now it is back
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[1], kAddresses[2]},
MakeXdsOverrideHostConfig()),
policy_.get()),
lb_policy()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
@ -338,6 +335,7 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
// picks where the override host is CONNECTING. All picks without an
// override host should not use this host.
gpr_log(GPR_INFO, "### subchannel starts reconnecting");
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannel->ConnectionRequested());
ExpectQueueEmpty();
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
@ -469,8 +467,5 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
return RUN_ALL_TESTS();
}

@ -411,7 +411,8 @@ static void test_cooldown() {
TEST(DnsResolverCooldownTest, MainTest) {
grpc_init();
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
g_work_serializer = &work_serializer;
g_default_dns_lookup_ares = grpc_dns_lookup_hostname_ares;

@ -87,7 +87,8 @@ static void test_fails(grpc_core::ResolverFactory* factory,
}
TEST(DnsResolverTest, MainTest) {
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
g_work_serializer = &work_serializer;
grpc_core::ResolverFactory* dns = grpc_core::CoreConfiguration::Get()

@ -22,9 +22,11 @@
#include <string.h>
#include <algorithm>
#include <functional>
#include <initializer_list>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -40,6 +42,8 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
@ -55,12 +59,14 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
void SetExpectedAndEvent(grpc_core::Resolver::Result expected,
gpr_event* ev) {
grpc_core::MutexLock lock(&mu_);
ASSERT_EQ(ev_, nullptr);
expected_ = std::move(expected);
ev_ = ev;
}
void ReportResult(grpc_core::Resolver::Result actual) override {
grpc_core::MutexLock lock(&mu_);
ASSERT_NE(ev_, nullptr);
// We only check the addresses, because that's the only thing
// explicitly set by the test via
@ -75,8 +81,9 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
}
private:
grpc_core::Resolver::Result expected_;
gpr_event* ev_ = nullptr;
grpc_core::Mutex mu_;
grpc_core::Resolver::Result expected_ ABSL_GUARDED_BY(mu_);
gpr_event* ev_ ABSL_GUARDED_BY(mu_) = nullptr;
};
static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
@ -124,7 +131,18 @@ static grpc_core::Resolver::Result create_new_resolver_result() {
TEST(FakeResolverTest, FakeResolver) {
grpc_core::ExecCtx exec_ctx;
std::shared_ptr<grpc_core::WorkSerializer> work_serializer =
std::make_shared<grpc_core::WorkSerializer>();
std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
auto synchronously = [work_serializer](std::function<void()> do_this_thing) {
grpc_core::Notification notification;
work_serializer->Run(
[do_this_thing = std::move(do_this_thing), &notification]() mutable {
do_this_thing();
notification.Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
};
// Create resolver.
ResultHandler* result_handler = new ResultHandler();
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
@ -134,7 +152,7 @@ TEST(FakeResolverTest, FakeResolver) {
work_serializer, response_generator.get(),
std::unique_ptr<grpc_core::Resolver::ResultHandler>(result_handler));
ASSERT_NE(resolver.get(), nullptr);
resolver->StartLocked();
synchronously([resolver = resolver.get()] { resolver->StartLocked(); });
// Test 1: normal resolution.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
@ -143,7 +161,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event ev1;
gpr_event_init(&ev1);
result_handler->SetExpectedAndEvent(result, &ev1);
response_generator->SetResponse(std::move(result));
response_generator->SetResponseSynchronously(std::move(result));
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 2: update resolution.
@ -154,7 +172,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event ev2;
gpr_event_init(&ev2);
result_handler->SetExpectedAndEvent(result, &ev2);
response_generator->SetResponse(std::move(result));
response_generator->SetResponseSynchronously(std::move(result));
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 3: normal re-resolution.
@ -168,10 +186,11 @@ TEST(FakeResolverTest, FakeResolver) {
result_handler->SetExpectedAndEvent(reresolution_result, &ev3);
// Set reresolution_results.
// No result will be returned until re-resolution is requested.
response_generator->SetReresolutionResponse(reresolution_result);
response_generator->SetReresolutionResponseSynchronously(reresolution_result);
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
synchronously(
[resolver = resolver.get()] { resolver->RequestReresolutionLocked(); });
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 4: repeat re-resolution.
@ -182,7 +201,8 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event_init(&ev4);
result_handler->SetExpectedAndEvent(std::move(reresolution_result), &ev4);
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
synchronously(
[resolver = resolver.get()] { resolver->RequestReresolutionLocked(); });
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 5: normal resolution.
@ -193,7 +213,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event ev5;
gpr_event_init(&ev5);
result_handler->SetExpectedAndEvent(result, &ev5);
response_generator->SetResponse(std::move(result));
response_generator->SetResponseSynchronously(std::move(result));
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 6: no-op.

@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -88,7 +89,8 @@ static void test_fails(grpc_core::ResolverFactory* factory,
}
TEST(SockaddrResolverTest, MainTest) {
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
g_work_serializer = &work_serializer;
grpc_core::ResolverFactory* ipv4 = grpc_core::CoreConfiguration::Get()

@ -80,6 +80,7 @@ void run_test(bool wait_for_ready) {
grpc_core::CqVerifier::tag(1), nullptr));
{
response_generator->WaitForResolverSet();
grpc_core::ExecCtx exec_ctx;
response_generator->SetFailure();
}

@ -221,9 +221,10 @@ grpc_cc_library(
"absl/time",
],
deps = [
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_unsecure",
"//src/core:channel_args_endpoint_config",
"//src/core:event_engine_common",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:memory_quota",
"//src/core:notification",

@ -202,6 +202,12 @@ void FuzzingEventEngine::TickForDuration(grpc_core::Duration d) {
TickUntilTimestamp(grpc_core::Timestamp::Now() + d);
}
void FuzzingEventEngine::SetRunAfterDurationCallback(
absl::AnyInvocable<void(Duration)> callback) {
grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
run_after_duration_callback_ = std::move(callback);
}
FuzzingEventEngine::Time FuzzingEventEngine::Now() {
grpc_core::MutexLock lock(&*now_mu_);
return now_;
@ -434,8 +440,10 @@ EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
// TODO(ctiller): do something with the timeout
// Schedule a timer to run (with some fuzzer selected delay) the on_connect
// callback.
auto task_handle = RunAfter(
Duration(0), [this, addr, on_connect = std::move(on_connect)]() mutable {
grpc_core::MutexLock lock(&*mu_);
auto task_handle = RunAfterLocked(
RunType::kRunAfter, Duration(0),
[this, addr, on_connect = std::move(on_connect)]() mutable {
// Check for a legal address and extract the target port number.
auto port = ResolvedAddressGetPort(addr);
grpc_core::MutexLock lock(&*mu_);
@ -489,11 +497,14 @@ FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
}
void FuzzingEventEngine::Run(Closure* closure) {
RunAfter(Duration::zero(), closure);
grpc_core::MutexLock lock(&*mu_);
RunAfterLocked(RunType::kRunAfter, Duration::zero(),
[closure]() { closure->Run(); });
}
void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
RunAfter(Duration::zero(), std::move(closure));
grpc_core::MutexLock lock(&*mu_);
RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
}
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
@ -503,6 +514,12 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
Duration when, absl::AnyInvocable<void()> closure) {
{
grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
if (run_after_duration_callback_ != nullptr) {
run_after_duration_callback_(when);
}
}
grpc_core::MutexLock lock(&*mu_);
// (b/258949216): Cap it to one year to avoid integer overflow errors.
return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),

@ -79,6 +79,10 @@ class FuzzingEventEngine : public EventEngine {
// Tick for some grpc_core::Duration
void TickForDuration(grpc_core::Duration d) ABSL_LOCKS_EXCLUDED(mu_);
// Sets a callback to be invoked any time RunAfter() is called.
// Allows tests to verify the specified duration.
void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback);
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
@ -290,6 +294,10 @@ class FuzzingEventEngine : public EventEngine {
std::queue<std::queue<size_t>> write_sizes_for_future_connections_
ABSL_GUARDED_BY(mu_);
grpc_pick_port_functions previous_pick_port_functions_;
grpc_core::Mutex run_after_duration_callback_mu_;
absl::AnyInvocable<void(Duration)> run_after_duration_callback_
ABSL_GUARDED_BY(run_after_duration_callback_mu_);
};
} // namespace experimental

@ -143,6 +143,12 @@ class FuzzingResolverEventEngine
}
bool Cancel(TaskHandle /* handle */) override { return true; }
void Run(absl::AnyInvocable<void()> fn) override {
runner_.Run(std::move(fn));
}
void Run(Closure* fn) override { runner_.Run(fn); }
void Tick() { runner_.Tick(); }
private:
@ -259,7 +265,7 @@ DEFINE_PROTO_FUZZER(const event_engine_client_channel_resolver::Msg& msg) {
grpc_event_engine::experimental::GetDefaultEventEngine());
{
// scoped to ensure the resolver is orphaned when done resolving.
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(engine);
EventEngineClientChannelDNSResolverFactory resolver_factory;
auto resolver_args = ConstructResolverArgs(
grpc_core::testing::CreateChannelArgsFromFuzzingConfiguration(

@ -383,6 +383,7 @@ grpc_cc_test(
deps = [
"//:gpr",
"//:grpc",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/util:grpc_test_util",
],
)

@ -32,15 +32,22 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/util/test_config.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
using grpc_event_engine::experimental::WaitForSingleOwner;
namespace grpc_core {
namespace {
TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; }
TEST(WorkSerializerTest, NoOp) { WorkSerializer lock(GetDefaultEventEngine()); }
TEST(WorkSerializerTest, ExecuteOneRun) {
grpc_core::WorkSerializer lock;
WorkSerializer lock(GetDefaultEventEngine());
gpr_event done;
gpr_event_init(&done);
lock.Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
@ -50,7 +57,7 @@ TEST(WorkSerializerTest, ExecuteOneRun) {
}
TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
grpc_core::WorkSerializer lock;
WorkSerializer lock(GetDefaultEventEngine());
gpr_event done;
gpr_event_init(&done);
lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
@ -63,7 +70,7 @@ TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
class TestThread {
public:
explicit TestThread(grpc_core::WorkSerializer* lock)
explicit TestThread(WorkSerializer* lock)
: lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
@ -104,14 +111,14 @@ class TestThread {
DEBUG_LOCATION);
}
grpc_core::WorkSerializer* lock_ = nullptr;
grpc_core::Thread thread_;
WorkSerializer* lock_ = nullptr;
Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(WorkSerializerTest, ExecuteMany) {
grpc_core::WorkSerializer lock;
WorkSerializer lock(GetDefaultEventEngine());
{
std::vector<std::unique_ptr<TestThread>> threads;
for (size_t i = 0; i < 10; ++i) {
@ -122,7 +129,7 @@ TEST(WorkSerializerTest, ExecuteMany) {
class TestThreadScheduleAndDrain {
public:
explicit TestThreadScheduleAndDrain(grpc_core::WorkSerializer* lock)
explicit TestThreadScheduleAndDrain(WorkSerializer* lock)
: lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
@ -165,14 +172,14 @@ class TestThreadScheduleAndDrain {
DEBUG_LOCATION);
}
grpc_core::WorkSerializer* lock_ = nullptr;
grpc_core::Thread thread_;
WorkSerializer* lock_ = nullptr;
Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
grpc_core::WorkSerializer lock;
WorkSerializer lock(GetDefaultEventEngine());
{
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
for (size_t i = 0; i < 10; ++i) {
@ -182,7 +189,7 @@ TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
}
TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
grpc_core::WorkSerializer lock;
WorkSerializer lock(GetDefaultEventEngine());
{
std::vector<std::unique_ptr<TestThread>> run_threads;
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
@ -196,16 +203,17 @@ TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
// Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
WaitForSingleOwner(GetDefaultEventEngine());
}
// Tests additional racy conditions when the last callback triggers work
// serializer destruction.
TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
for (int i = 0; i < 1000; ++i) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
grpc_core::Notification notification;
auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
Notification notification;
std::thread t1([&]() {
notification.WaitForNotification();
lock.reset();
@ -218,7 +226,7 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
// Tests racy conditions when the last callback triggers work
// serializer destruction.
TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
auto lock = std::make_shared<grpc_core::WorkSerializer>();
auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
absl::Barrier barrier(11);
std::vector<std::thread> threads;
threads.reserve(10);
@ -237,42 +245,53 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
#ifndef NDEBUG
TEST(WorkSerializerTest, RunningInWorkSerializer) {
grpc_core::WorkSerializer work_serializer1;
grpc_core::WorkSerializer work_serializer2;
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
auto work_serializer1 =
std::make_shared<WorkSerializer>(GetDefaultEventEngine());
auto work_serializer2 =
std::make_shared<WorkSerializer>(GetDefaultEventEngine());
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
work_serializer1->Run(
[=]() {
EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
work_serializer2->Run(
[=]() {
EXPECT_EQ(work_serializer1->RunningInWorkSerializer(),
!IsWorkSerializerDispatchEnabled());
EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
work_serializer2->Run(
[=]() {
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
work_serializer1->Run(
[=]() {
EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
EXPECT_EQ(work_serializer2->RunningInWorkSerializer(),
!IsWorkSerializerDispatchEnabled());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
Notification done1;
Notification done2;
work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION);
work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION);
done1.WaitForNotification();
done2.WaitForNotification();
}
#endif
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);

@ -358,9 +358,10 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
{
grpc_core::ExecCtx exec_ctx;
fake_resolver_response_generator->SetResponse(BuildResolverResponse(
{absl::StrCat("ipv4:", kSharedUnconnectableAddress),
absl::StrCat("ipv4:", test_server->address())}));
fake_resolver_response_generator->SetResponseSynchronously(
BuildResolverResponse(
{absl::StrCat("ipv4:", kSharedUnconnectableAddress),
absl::StrCat("ipv4:", test_server->address())}));
}
args.push_back(grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
fake_resolver_response_generator.get()));

@ -492,8 +492,9 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
for (int i = 0; i < 3; i++) {
gpr_log(GPR_INFO, "Expected keepalive time : %d",
expected_keepalive_time_sec);
response_generator->SetResponse(BuildResolverResult({absl::StrCat(
"ipv4:", i % 2 == 0 ? server_address1 : server_address2)}));
response_generator->SetResponseSynchronously(
BuildResolverResult({absl::StrCat(
"ipv4:", i % 2 == 0 ? server_address1 : server_address2)}));
// ExecCtx::Flush() might not be enough to make sure that the resolver
// result has been propagated, so sleep for a bit.
grpc_core::ExecCtx::Get()->Flush();
@ -506,7 +507,7 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
GPR_INFO,
"Client keepalive time %d should now be in sync with the server settings",
expected_keepalive_time_sec);
response_generator->SetResponse(
response_generator->SetResponseSynchronously(
BuildResolverResult({absl::StrCat("ipv4:", server_address2)}));
grpc_core::ExecCtx::Get()->Flush();
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
@ -554,7 +555,7 @@ TEST_F(KeepaliveThrottlingTest,
grpc_channel* channel =
grpc_channel_create("fake:///", creds, &client_channel_args);
grpc_channel_credentials_release(creds);
response_generator->SetResponse(
response_generator->SetResponseSynchronously(
BuildResolverResult({absl::StrCat("ipv4:", server_address1),
absl::StrCat("ipv4:", server_address2)}));
// For a single subchannel 3 GOAWAYs would be sufficient to increase the

@ -76,7 +76,7 @@ void TryConnectAndDestroy() {
<< lb_address_result.service_config.status();
lb_address_result.args = grpc_core::SetGrpcLbBalancerAddresses(
grpc_core::ChannelArgs(), addresses);
response_generator->SetResponse(lb_address_result);
response_generator->SetResponseAsync(lb_address_result);
grpc::ChannelArguments args;
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator.get());

@ -596,6 +596,7 @@ grpc_cc_test(
"gtest",
],
flaky = True, # TODO(b/150567713)
shard_count = 20,
tags = [
"cpp_end2end_test",
"cpp_lb_end2end_test",

@ -13,6 +13,7 @@
// limitations under the License.
#include <algorithm>
#include <chrono>
#include <deque>
#include <memory>
#include <mutex>
@ -219,13 +220,13 @@ class FakeResolverResponseGeneratorWrapper {
const grpc_core::ChannelArgs& per_address_args =
grpc_core::ChannelArgs()) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponse(BuildFakeResults(
response_generator_->SetResponseSynchronously(BuildFakeResults(
ipv6_only_, ports, service_config_json, per_address_args));
}
void SetNextResolutionUponError(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetReresolutionResponse(
response_generator_->SetReresolutionResponseSynchronously(
BuildFakeResults(ipv6_only_, ports));
}
@ -236,7 +237,7 @@ class FakeResolverResponseGeneratorWrapper {
void SetResponse(grpc_core::Resolver::Result result) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponse(std::move(result));
response_generator_->SetResponseSynchronously(std::move(result));
}
grpc_core::FakeResolverResponseGenerator* Get() const {
@ -635,10 +636,14 @@ TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
// Note that this call also returns IDLE, since the state change has
// not yet occurred; it just gets triggered by this call.
EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
// Now that the channel is trying to connect, we should be in state
// Now that the channel is trying to connect, we should get to state
// CONNECTING.
EXPECT_EQ(channel->GetState(false /* try_to_connect */),
GRPC_CHANNEL_CONNECTING);
ASSERT_TRUE(
WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
if (state == GRPC_CHANNEL_IDLE) return false;
EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
return true;
}));
// Return a resolver result, which allows the connection attempt to proceed.
response_generator.SetNextResolution(GetServersPorts());
// We should eventually transition into state READY.
@ -1300,6 +1305,7 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
grpc_core::Resolver::Result result;
result.addresses.emplace();
result.result_health_callback = [&](absl::Status status) {
gpr_log(GPR_INFO, "****** RESULT HEALTH CALLBACK *******");
EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
EXPECT_EQ("address list must not be empty", status.message()) << status;
notification.Notify();
@ -1312,8 +1318,8 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
};
EXPECT_TRUE(
WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
// Callback should have been run.
ASSERT_TRUE(notification.HasBeenNotified());
// Callback should run.
notification.WaitForNotification();
// Return a valid address.
gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******");
StartServers(1);
@ -1608,6 +1614,7 @@ TEST_F(RoundRobinTest, Updates) {
ports.clear();
ports.emplace_back(servers_[1]->port_);
response_generator.SetNextResolution(ports);
WaitForChannelReady(channel.get());
WaitForServer(DEBUG_LOCATION, stub, 1);
EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
// Check LB policy name for the channel.

@ -605,7 +605,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = MakeResolverResult(
balancer_address_data, backend_address_data, service_config_json);
response_generator_->SetResponse(std::move(result));
response_generator_->SetResponseSynchronously(std::move(result));
}
void SetNextReresolutionResponse(
@ -613,9 +613,11 @@ class GrpclbEnd2endTest : public ::testing::Test {
const std::vector<AddressData>& backend_address_data = {},
const char* service_config_json = kDefaultServiceConfig) {
grpc_core::ExecCtx exec_ctx;
response_generator_->WaitForResolverSet();
grpc_core::Resolver::Result result = MakeResolverResult(
balancer_address_data, backend_address_data, service_config_json);
response_generator_->SetReresolutionResponse(std::move(result));
response_generator_->SetReresolutionResponseSynchronously(
std::move(result));
}
std::vector<int> GetBackendPorts(size_t start_index = 0,

@ -135,7 +135,8 @@ class FakeResolverResponseGeneratorWrapper {
void SetNextResolution(absl::string_view service_config_json) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponse(BuildFakeResults(service_config_json));
response_generator_->SetResponseSynchronously(
BuildFakeResults(service_config_json));
}
grpc_core::FakeResolverResponseGenerator* Get() const {

@ -192,7 +192,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
void SetNextResolutionNoServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
response_generator_->SetResponse(result);
response_generator_->SetResponseSynchronously(result);
}
void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
@ -201,7 +201,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
result.service_config =
grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), "{}");
ASSERT_TRUE(result.service_config.ok()) << result.service_config.status();
response_generator_->SetResponse(result);
response_generator_->SetResponseSynchronously(result);
}
void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
@ -209,7 +209,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config =
absl::InvalidArgumentError("error parsing service config");
response_generator_->SetResponse(result);
response_generator_->SetResponseSynchronously(result);
}
void SetNextResolutionWithServiceConfig(const std::vector<int>& ports,
@ -218,7 +218,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config =
grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), svc_cfg);
response_generator_->SetResponse(result);
response_generator_->SetResponseSynchronously(result);
}
std::vector<int> GetServersPorts(size_t start_index = 0) {

@ -101,7 +101,7 @@ TEST_P(LogicalDNSClusterTest, Basic) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
std::move(result));
}
// RPCs should succeed.
@ -287,7 +287,7 @@ TEST_P(AggregateClusterTest, EdsToLogicalDns) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2));
logical_dns_cluster_resolver_response_generator_->SetResponse(
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
std::move(result));
}
// Wait for traffic to go to backend 0.
@ -348,7 +348,7 @@ TEST_P(AggregateClusterTest, LogicalDnsToEds) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(0, 1));
logical_dns_cluster_resolver_response_generator_->SetResponse(
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
std::move(result));
}
// Wait for traffic to go to backend 0.
@ -419,7 +419,7 @@ TEST_P(AggregateClusterTest, ReconfigEdsWhileLogicalDnsChildFails) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = absl::UnavailableError("injected error");
logical_dns_cluster_resolver_response_generator_->SetResponse(
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
std::move(result));
}
// When an RPC fails, we know the channel has seen the update.

@ -210,7 +210,7 @@ TEST_P(RingHashTest,
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
std::move(result));
}
// Inject connection delay to make this act more realistically.
@ -278,7 +278,7 @@ TEST_P(RingHashTest,
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponse(
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
std::move(result));
}
// Set up connection attempt injector.

@ -107,7 +107,8 @@ void ArgsInit(ArgsStruct* args) {
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = std::make_shared<grpc_core::WorkSerializer>();
args->lock = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
gpr_atm_rel_store(&args->done_atm, 0);
args->channel_args = nullptr;
}

@ -201,7 +201,8 @@ void ArgsInit(ArgsStruct* args) {
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = std::make_shared<grpc_core::WorkSerializer>();
args->lock = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
args->done = false;
args->channel_args = nullptr;
}

Loading…
Cancel
Save