Revert "[work-serializer] Dispatch on run experiment" (#34371)

Reverts grpc/grpc#34274

(needs some changes internally)
pull/34321/head^2
Craig Tiller 1 year ago committed by GitHub
parent 1705470950
commit d589caa679
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      BUILD
  2. 45
      CMakeLists.txt
  3. 24
      bazel/experiments.bzl
  4. 49
      build_autogenerated.yaml
  5. 40
      src/core/ext/filters/client_channel/client_channel.cc
  6. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 3
      src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h
  8. 30
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  9. 38
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  10. 1
      src/core/ext/filters/client_channel/subchannel.cc
  11. 4
      src/core/ext/filters/client_channel/subchannel.h
  12. 1
      src/core/ext/xds/xds_client.cc
  13. 6
      src/core/lib/channel/channelz_registry.h
  14. 14
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  15. 21
      src/core/lib/experiments/experiments.cc
  16. 15
      src/core/lib/experiments/experiments.h
  17. 12
      src/core/lib/experiments/experiments.yaml
  18. 2
      src/core/lib/experiments/rollouts.yaml
  19. 293
      src/core/lib/gprpp/work_serializer.cc
  20. 34
      src/core/lib/gprpp/work_serializer.h
  21. 1
      test/core/channel/BUILD
  22. 53
      test/core/channel/channelz_test.cc
  23. 19
      test/core/client_channel/lb_policy/BUILD
  24. 375
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  25. 41
      test/core/client_channel/lb_policy/outlier_detection_test.cc
  26. 52
      test/core/client_channel/lb_policy/pick_first_test.cc
  27. 65
      test/core/client_channel/lb_policy/round_robin_test.cc
  28. 58
      test/core/client_channel/lb_policy/weighted_round_robin_test.cc
  29. 23
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  30. 3
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  31. 3
      test/core/client_channel/resolvers/dns_resolver_test.cc
  32. 40
      test/core/client_channel/resolvers/fake_resolver_test.cc
  33. 4
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc
  34. 1
      test/core/end2end/no_server_test.cc
  35. 3
      test/core/event_engine/BUILD
  36. 25
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  37. 8
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  38. 8
      test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc
  39. 1
      test/core/gprpp/BUILD
  40. 99
      test/core/gprpp/work_serializer_test.cc
  41. 7
      test/core/iomgr/stranded_event_test.cc
  42. 9
      test/core/transport/chttp2/too_many_pings_test.cc
  43. 2
      test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc
  44. 1
      test/cpp/end2end/BUILD
  45. 23
      test/cpp/end2end/client_lb_end2end_test.cc
  46. 6
      test/cpp/end2end/grpclb_end2end_test.cc
  47. 3
      test/cpp/end2end/rls_end2end_test.cc
  48. 8
      test/cpp/end2end/service_config_end2end_test.cc
  49. 8
      test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc
  50. 4
      test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc
  51. 3
      test/cpp/naming/cancel_ares_query_test.cc
  52. 3
      test/cpp/naming/resolver_component_test.cc

10
BUILD

@ -2398,20 +2398,14 @@ grpc_cc_library(
hdrs = [
"//src/core:lib/gprpp/work_serializer.h",
],
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
],
external_deps = ["absl/base:core_headers"],
language = "c++",
visibility = ["@grpc:client_channel"],
deps = [
"debug_location",
"event_engine_base_hdrs",
"exec_ctx",
"gpr",
"grpc_trace",
"orphanable",
"//src/core:experiments",
],
)
@ -3072,7 +3066,6 @@ grpc_cc_library(
"//src/core:env",
"//src/core:error",
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_deadline_filter",
"//src/core:grpc_service_config",
@ -3730,7 +3723,6 @@ grpc_cc_library(
"work_serializer",
"//src/core:channel_args",
"//src/core:grpc_service_config",
"//src/core:notification",
"//src/core:ref_counted",
"//src/core:useful",
],

45
CMakeLists.txt generated

@ -8270,6 +8270,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(cf_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -12218,6 +12219,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(fuzzing_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -13509,6 +13511,7 @@ target_include_directories(h2_ssl_cert_test
target_link_libraries(h2_ssl_cert_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -16574,6 +16577,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(oracle_event_engine_posix_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -16759,13 +16763,7 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(outlier_detection_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/outlier_detection_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(outlier_detection_test PUBLIC cxx_std_14)
target_include_directories(outlier_detection_test
@ -16790,7 +16788,6 @@ target_include_directories(outlier_detection_test
target_link_libraries(outlier_detection_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -17148,13 +17145,7 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(pick_first_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/pick_first_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(pick_first_test PUBLIC cxx_std_14)
target_include_directories(pick_first_test
@ -17179,7 +17170,6 @@ target_include_directories(pick_first_test
target_link_libraries(pick_first_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -17594,6 +17584,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(posix_endpoint_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -17667,6 +17658,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(posix_event_engine_connect_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -17711,6 +17703,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(posix_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc++_test_util
)
@ -20694,13 +20687,7 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(round_robin_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/round_robin_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(round_robin_test PUBLIC cxx_std_14)
target_include_directories(round_robin_test
@ -20725,7 +20712,6 @@ target_include_directories(round_robin_test
target_link_libraries(round_robin_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -23294,7 +23280,6 @@ add_executable(test_core_channel_channelz_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.h
test/core/channel/channelz_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/cpp/util/channel_trace_proto_helper.cc
)
target_compile_features(test_core_channel_channelz_test PUBLIC cxx_std_14)
@ -24580,6 +24565,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
target_link_libraries(thready_posix_event_engine_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
grpc_unsecure
grpc_test_util
)
@ -25614,13 +25600,7 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(weighted_round_robin_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/weighted_round_robin_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(weighted_round_robin_test PUBLIC cxx_std_14)
target_include_directories(weighted_round_robin_test
@ -25645,7 +25625,6 @@ target_include_directories(weighted_round_robin_test
target_link_libraries(weighted_round_robin_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
@ -25958,7 +25937,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(work_serializer_test
test/core/event_engine/event_engine_test_utils.cc
test/core/gprpp/work_serializer_test.cc
)
target_compile_features(work_serializer_test PUBLIC cxx_std_14)
@ -28658,13 +28636,7 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(xds_override_host_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/xds_override_host_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(xds_override_host_test PUBLIC cxx_std_14)
target_include_directories(xds_override_host_test
@ -28689,7 +28661,6 @@ target_include_directories(xds_override_host_test
target_link_libraries(xds_override_host_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)

@ -23,12 +23,11 @@ EXPERIMENTS = {
"off": {
"core_end2end_test": [
"event_engine_listener",
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
"cpp_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -42,8 +41,8 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lb_unit_test": [
"work_serializer_dispatch",
"lame_client_test": [
"promise_based_client_call",
],
"logging_test": [
"promise_based_server_call",
@ -55,7 +54,6 @@ EXPERIMENTS = {
],
"xds_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
},
"on": {
@ -82,12 +80,11 @@ EXPERIMENTS = {
"off": {
"core_end2end_test": [
"event_engine_listener",
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
"cpp_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -101,8 +98,8 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lb_unit_test": [
"work_serializer_dispatch",
"lame_client_test": [
"promise_based_client_call",
],
"logging_test": [
"promise_based_server_call",
@ -114,7 +111,6 @@ EXPERIMENTS = {
],
"xds_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
},
"on": {
@ -145,12 +141,11 @@ EXPERIMENTS = {
"core_end2end_test": [
"event_engine_client",
"event_engine_listener",
"promise_based_client_call",
"promise_based_server_call",
"work_serializer_dispatch",
],
"cpp_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -167,8 +162,8 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"lb_unit_test": [
"work_serializer_dispatch",
"lame_client_test": [
"promise_based_client_call",
],
"logging_test": [
"promise_based_server_call",
@ -183,7 +178,6 @@ EXPERIMENTS = {
],
"xds_end2end_test": [
"promise_based_server_call",
"work_serializer_dispatch",
],
},
"on": {

@ -6493,6 +6493,7 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -8846,6 +8847,7 @@ targets:
- test/core/event_engine/test_suite/tests/timer_test.cc
deps:
- gtest
- grpc_unsecure
- protobuf
- grpc_test_util
platforms:
@ -9497,6 +9499,7 @@ targets:
- test/core/event_engine/event_engine_test_utils.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
- name: h2_ssl_session_reuse_test
gtest: true
@ -11279,6 +11282,7 @@ targets:
- test/core/event_engine/test_suite/tests/server_test.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -11350,16 +11354,11 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/outlier_detection_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: overload_test
@ -11581,17 +11580,12 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/event_engine/mock_event_engine.h
- test/core/util/scoped_env_var.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/pick_first_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: pid_controller_test
@ -11856,6 +11850,7 @@ targets:
- test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -11890,6 +11885,7 @@ targets:
- test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -11921,6 +11917,7 @@ targets:
- test/cpp/util/get_grpc_test_runfile_dir.cc
deps:
- gtest
- grpc_unsecure
- grpc++_test_util
platforms:
- linux
@ -13922,16 +13919,11 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/round_robin_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: secure_auth_context_test
@ -15273,12 +15265,10 @@ targets:
build: test
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
- test/cpp/util/channel_trace_proto_helper.h
src:
- src/proto/grpc/channelz/channelz.proto
- test/core/channel/channelz_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/cpp/util/channel_trace_proto_helper.cc
deps:
- gtest
@ -16341,6 +16331,7 @@ targets:
- test/core/event_engine/test_suite/thready_posix_event_engine_test.cc
deps:
- gtest
- grpc_unsecure
- grpc_test_util
platforms:
- linux
@ -16873,16 +16864,11 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/weighted_round_robin_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: win_socket_test
@ -17156,10 +17142,8 @@ targets:
build: test
run: false
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
headers: []
src:
- test/core/event_engine/event_engine_test_utils.cc
- test/core/gprpp/work_serializer_test.cc
deps:
- gtest
@ -18195,16 +18179,11 @@ targets:
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/event_engine/mock_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/xds_override_host_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: xds_pick_first_end2end_test

@ -70,9 +70,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/unique_type_name.h"
@ -342,7 +340,11 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
chand_, this, Activity::current()->DebugTag().c_str(),
result.has_value() ? result->ToString().c_str() : "Pending");
}
if (!result.has_value()) return Pending{};
if (!result.has_value()) {
waker_ = Activity::current()->MakeNonOwningWaker();
was_queued_ = true;
return Pending{};
}
if (!result->ok()) return *result;
call_args.client_initial_metadata = std::move(client_initial_metadata_);
return std::move(call_args);
@ -360,17 +362,10 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
return GetContext<grpc_call_context_element>();
}
void OnAddToQueueLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) {
waker_ = Activity::current()->MakeNonOwningWaker();
was_queued_ = true;
}
void RetryCheckResolutionLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) override {
void RetryCheckResolutionLocked() override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked(): %s",
chand_, this, waker_.ActivityDebugTag().c_str());
gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked()",
chand_, this);
}
waker_.WakeupAsync();
}
@ -387,7 +382,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
grpc_polling_entity pollent_;
ClientMetadataHandle client_initial_metadata_;
bool was_queued_ = false;
Waker waker_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_);
Waker waker_;
};
//
@ -1183,8 +1178,7 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args,
interested_parties_(grpc_pollset_set_create()),
service_config_parser_index_(
internal::ClientChannelServiceConfigParser::ParserIndex()),
work_serializer_(
std::make_shared<WorkSerializer>(*args->channel_stack->event_engine)),
work_serializer_(std::make_shared<WorkSerializer>()),
state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
subchannel_pool_(GetSubchannelPool(channel_args_)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
@ -1744,10 +1738,6 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() {
void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
const absl::Status& status,
const char* reason) {
if (state != GRPC_CHANNEL_SHUTDOWN &&
state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
Crash("Illegal transition SHUTDOWN -> anything");
}
state_tracker_.SetState(state, status, reason);
if (channelz_node_ != nullptr) {
channelz_node_->SetConnectivityState(state);
@ -1944,12 +1934,10 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
}
void ClientChannel::TryToConnectLocked() {
if (disconnect_error_.ok()) {
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
} else if (resolver_ == nullptr) {
CreateResolverLocked();
}
if (lb_policy_ != nullptr) {
lb_policy_->ExitIdleLocked();
} else if (resolver_ == nullptr) {
CreateResolverLocked();
}
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
}

@ -1588,7 +1588,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() {
// Pass channel creds via channel args, since the fake resolver won't
// do this automatically.
result.args = lb_channel_args.SetObject(std::move(channel_credentials));
response_generator_->SetResponseAsync(std::move(result));
response_generator_->SetResponse(std::move(result));
// Return status.
return status;
}

@ -125,8 +125,7 @@ class HealthProducer : public Subchannel::DataProducerInterface {
WeakRefCountedPtr<HealthProducer> producer_;
absl::string_view health_check_service_name_;
std::shared_ptr<WorkSerializer> work_serializer_ =
std::make_shared<WorkSerializer>(
producer_->subchannel_->event_engine());
std::make_shared<WorkSerializer>();
absl::optional<grpc_connectivity_state> state_
ABSL_GUARDED_BY(&HealthProducer::mu_);

@ -214,31 +214,25 @@ FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
void FakeResolverResponseGenerator::SetResponseAndNotify(
Resolver::Result result, Notification* notify_when_set) {
void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
if (resolver_ == nullptr) {
has_result_ = true;
result_ = std::move(result);
if (notify_when_set != nullptr) notify_when_set->Notify();
return;
}
resolver = resolver_->Ref();
}
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver, std::move(result));
resolver->work_serializer_->Run(
[arg, notify_when_set]() {
arg->SetResponseLocked();
if (notify_when_set != nullptr) notify_when_set->Notify();
},
DEBUG_LOCATION);
resolver->work_serializer_->Run([arg]() { arg->SetResponseLocked(); },
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify(
Resolver::Result result, Notification* notify_when_set) {
void FakeResolverResponseGenerator::SetReresolutionResponse(
Resolver::Result result) {
RefCountedPtr<FakeResolver> resolver;
{
MutexLock lock(&mu_);
@ -248,11 +242,7 @@ void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify(
FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
resolver, std::move(result), true /* has_result */);
resolver->work_serializer_->Run(
[arg, notify_when_set]() {
arg->SetReresolutionResponseLocked();
if (notify_when_set != nullptr) notify_when_set->Notify();
},
DEBUG_LOCATION);
[arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
@ -299,7 +289,6 @@ void FakeResolverResponseGenerator::SetFakeResolver(
RefCountedPtr<FakeResolver> resolver) {
MutexLock lock(&mu_);
resolver_ = std::move(resolver);
cv_.SignalAll();
if (resolver_ == nullptr) return;
if (has_result_) {
FakeResolverResponseSetter* arg =
@ -310,13 +299,6 @@ void FakeResolverResponseGenerator::SetFakeResolver(
}
}
void FakeResolverResponseGenerator::WaitForResolverSet() {
MutexLock lock(&mu_);
while (resolver_ == nullptr) {
cv_.Wait(&mu_);
}
}
namespace {
void* ResponseGeneratorChannelArgCopy(void* p) {

@ -19,15 +19,12 @@
#include <grpc/support/port_platform.h>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -60,39 +57,13 @@ class FakeResolverResponseGenerator
// instance to trigger a new resolution with the specified result. If the
// resolver is not available yet, delays response setting until it is. This
// can be called at most once before the resolver is available.
// notify_when_set is an optional notification to signal when the response has
// been set.
void SetResponseAndNotify(Resolver::Result result,
Notification* notify_when_set);
// Same as SetResponseAndNotify(), assume that async setting is fine
void SetResponseAsync(Resolver::Result result) {
SetResponseAndNotify(std::move(result), nullptr);
}
// Same as SetResponseAndNotify(), but create and wait for the notification
void SetResponseSynchronously(Resolver::Result result) {
Notification n;
SetResponseAndNotify(std::move(result), &n);
n.WaitForNotification();
}
void SetResponse(Resolver::Result result);
// Sets the re-resolution response, which is returned by the fake resolver
// when re-resolution is requested (via \a RequestReresolutionLocked()).
// The new re-resolution response replaces any previous re-resolution
// response that may have been set by a previous call.
// notify_when_set is an optional notification to signal when the response has
// been set.
void SetReresolutionResponseAndNotify(Resolver::Result result,
Notification* notify_when_set);
void SetReresolutionResponseAsync(Resolver::Result result) {
SetReresolutionResponseAndNotify(std::move(result), nullptr);
}
void SetReresolutionResponseSynchronously(Resolver::Result result) {
Notification n;
SetReresolutionResponseAndNotify(std::move(result), &n);
n.WaitForNotification();
}
void SetReresolutionResponse(Resolver::Result result);
// Unsets the re-resolution response. After this, the fake resolver will
// not return anything when \a RequestReresolutionLocked() is called.
@ -117,10 +88,6 @@ class FakeResolverResponseGenerator
return GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR;
}
// Wait for a resolver to be set (setting may be happening asynchronously, so
// this may block - consider it test only).
void WaitForResolverSet();
static int ChannelArgsCompare(const FakeResolverResponseGenerator* a,
const FakeResolverResponseGenerator* b) {
return QsortCompare(a, b);
@ -133,7 +100,6 @@ class FakeResolverResponseGenerator
// Mutex protecting the members below.
Mutex mu_;
CondVar cv_;
RefCountedPtr<FakeResolver> resolver_ ABSL_GUARDED_BY(mu_);
Resolver::Result result_ ABSL_GUARDED_BY(mu_);
bool has_result_ ABSL_GUARDED_BY(mu_) = false;

@ -463,7 +463,6 @@ Subchannel::Subchannel(SubchannelKey key,
pollset_set_(grpc_pollset_set_create()),
connector_(std::move(connector)),
watcher_list_(this),
work_serializer_(args_.GetObjectRef<EventEngine>()),
backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
event_engine_(args_.GetObjectRef<EventEngine>()) {
// A grpc_init is added here to ensure that grpc_shutdown does not happen

@ -272,10 +272,6 @@ class Subchannel : public DualRefCounted<Subchannel> {
void RemoveDataProducer(DataProducerInterface* data_producer)
ABSL_LOCKS_EXCLUDED(mu_);
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine() {
return event_engine_;
}
private:
// A linked list of ConnectivityStateWatcherInterfaces that are monitoring
// the subchannel's state.

@ -1491,7 +1491,6 @@ XdsClient::XdsClient(
xds_federation_enabled_(XdsFederationEnabled()),
api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_,
std::move(user_agent_name), std::move(user_agent_version)),
work_serializer_(engine),
engine_(std::move(engine)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);

@ -25,8 +25,6 @@
#include <map>
#include <string>
#include "absl/base/thread_annotations.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -92,8 +90,8 @@ class ChannelzRegistry {
// protects members
Mutex mu_;
std::map<intptr_t, BaseNode*> node_map_ ABSL_GUARDED_BY(mu_);
intptr_t uuid_generator_ ABSL_GUARDED_BY(mu_) = 0;
std::map<intptr_t, BaseNode*> node_map_;
intptr_t uuid_generator_ = 0;
};
} // namespace channelz

@ -23,7 +23,6 @@
#include <inttypes.h>
#include <atomic>
#include <chrono>
#include <memory>
#include <utility>
@ -87,15 +86,10 @@ namespace grpc_event_engine {
namespace experimental {
namespace {
// TODO(ctiller): grpc_core::Timestamp, Duration have very specific contracts
// around time caching and around when the underlying gpr_now call can be
// substituted out.
// We should probably move all usage here to std::chrono to avoid weird bugs in
// the future.
// Maximum amount of time an extra thread is allowed to idle before being
// reclaimed.
constexpr auto kIdleThreadLimit = std::chrono::seconds(20);
constexpr grpc_core::Duration kIdleThreadLimit =
grpc_core::Duration::Seconds(20);
// Rate at which "Waiting for ..." logs should be printed while quiescing.
constexpr size_t kBlockingQuiesceLogRateSeconds = 3;
// Minumum time between thread creations.
@ -430,7 +424,7 @@ bool WorkStealingThreadPool::ThreadState::Step() {
// * the global queue is empty
// * the steal pool returns nullptr
bool should_run_again = false;
auto start_time = std::chrono::steady_clock::now();
grpc_core::Timestamp start_time{grpc_core::Timestamp::Now()};
// Wait until work is available or until shut down.
while (!pool_->IsForking()) {
// Pull from the global queue next
@ -458,7 +452,7 @@ bool WorkStealingThreadPool::ThreadState::Step() {
// has been idle long enough.
if (timed_out &&
pool_->living_thread_count()->count() > pool_->reserve_threads() &&
std::chrono::steady_clock::now() - start_time > kIdleThreadLimit) {
grpc_core::Timestamp::Now() - start_time > kIdleThreadLimit) {
return false;
}
}

@ -96,11 +96,6 @@ const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_work_serializer_dispatch =
"Have the work serializer dispatch work to event engine for every "
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_lazier_stream_updates =
"Allow streams to consume up to 50% of the incoming window before we force "
"send a flow control update.";
@ -164,8 +159,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_keepalive_fix, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, false, false},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"lazier_stream_updates", description_lazier_stream_updates,
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
@ -253,11 +246,6 @@ const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_work_serializer_dispatch =
"Have the work serializer dispatch work to event engine for every "
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_lazier_stream_updates =
"Allow streams to consume up to 50% of the incoming window before we force "
"send a flow control update.";
@ -321,8 +309,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_keepalive_fix, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, false, false},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"lazier_stream_updates", description_lazier_stream_updates,
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,
@ -410,11 +396,6 @@ const char* const description_keepalive_server_fix =
"Allows overriding keepalive_permit_without_calls for servers. Refer "
"https://github.com/grpc/grpc/pull/33917 for more information.";
const char* const additional_constraints_keepalive_server_fix = "{}";
const char* const description_work_serializer_dispatch =
"Have the work serializer dispatch work to event engine for every "
"callback, instead of running things inline in the first thread that "
"successfully enqueues work.";
const char* const additional_constraints_work_serializer_dispatch = "{}";
const char* const description_lazier_stream_updates =
"Allow streams to consume up to 50% of the incoming window before we force "
"send a flow control update.";
@ -478,8 +459,6 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_keepalive_fix, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
additional_constraints_keepalive_server_fix, false, false},
{"work_serializer_dispatch", description_work_serializer_dispatch,
additional_constraints_work_serializer_dispatch, false, true},
{"lazier_stream_updates", description_lazier_stream_updates,
additional_constraints_lazier_stream_updates, true, true},
{"jitter_max_idle", description_jitter_max_idle,

@ -83,7 +83,6 @@ inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsUniqueMetadataStringsEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
@ -116,7 +115,6 @@ inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsUniqueMetadataStringsEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
@ -149,7 +147,6 @@ inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsUniqueMetadataStringsEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
@ -207,20 +204,16 @@ inline bool IsUniqueMetadataStringsEnabled() { return IsExperimentEnabled(18); }
inline bool IsKeepaliveFixEnabled() { return IsExperimentEnabled(19); }
#define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_SERVER_FIX
inline bool IsKeepaliveServerFixEnabled() { return IsExperimentEnabled(20); }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH
inline bool IsWorkSerializerDispatchEnabled() {
return IsExperimentEnabled(21);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES
inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(22); }
inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(21); }
#define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE
inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(23); }
inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); }
#define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST
inline bool IsRoundRobinDelegateToPickFirstEnabled() {
return IsExperimentEnabled(24);
return IsExperimentEnabled(23);
}
constexpr const size_t kNumExperiments = 25;
constexpr const size_t kNumExperiments = 24;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
#endif

@ -81,9 +81,7 @@
(ie when all filters in a stack are promise based)
expiry: 2023/11/01
owner: ctiller@google.com
# TODO(ctiller): re-enable once we've got some more CI bandwidth
# test_tags: ["core_end2end_test", "lame_client_test"]
test_tags: []
test_tags: ["core_end2end_test", "lame_client_test"]
- name: free_large_allocator
description: If set, return all free bytes from a "big" allocator
expiry: 2023/11/01
@ -171,14 +169,6 @@
owner: yashkt@google.com
test_tags: []
allow_in_fuzzing_config: false
- name: work_serializer_dispatch
description:
Have the work serializer dispatch work to event engine for every callback,
instead of running things inline in the first thread that successfully
enqueues work.
expiry: 2024/02/10
owner: ctiller@google.com
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "lb_unit_test"]
- name: lazier_stream_updates
description:
Allow streams to consume up to 50% of the incoming window before we

@ -78,8 +78,6 @@
windows: broken
- name: work_stealing
default: true
- name: work_serializer_dispatch
default: false
- name: client_privacy
default: false
- name: canary_client_privacy

@ -20,25 +20,18 @@
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include <utility>
#include "absl/container/inlined_vector.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
@ -50,32 +43,13 @@ DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
class WorkSerializer::WorkSerializerImpl : public Orphanable {
public:
virtual void Run(std::function<void()> callback,
const DebugLocation& location) = 0;
virtual void Schedule(std::function<void()> callback,
const DebugLocation& location) = 0;
virtual void DrainQueue() = 0;
#ifndef NDEBUG
virtual bool RunningInWorkSerializer() const = 0;
#endif
};
//
// WorkSerializer::LegacyWorkSerializer
//
class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
public:
void Run(std::function<void()> callback,
const DebugLocation& location) override;
void Schedule(std::function<void()> callback,
const DebugLocation& location) override;
void DrainQueue() override;
void Run(std::function<void()> callback, const DebugLocation& location);
void Schedule(std::function<void()> callback, const DebugLocation& location);
void DrainQueue();
void Orphan() override;
#ifndef NDEBUG
bool RunningInWorkSerializer() const override {
bool RunningInWorkSerializer() const {
return std::this_thread::get_id() == current_thread_;
}
#endif
@ -115,14 +89,6 @@ class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
return static_cast<uint64_t>(ref_pair & 0xffffffffffffu);
}
#ifndef NDEBUG
void SetCurrentThread() { current_thread_ = std::this_thread::get_id(); }
void ClearCurrentThread() { current_thread_ = std::thread::id(); }
#else
void SetCurrentThread() {}
void ClearCurrentThread() {}
#endif
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
std::atomic<uint64_t> refs_{MakeRefPair(0, 1)};
@ -132,8 +98,8 @@ class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl {
#endif
};
void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
const DebugLocation& location) {
void WorkSerializer::WorkSerializerImpl::Run(std::function<void()> callback,
const DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
@ -146,7 +112,9 @@ void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0);
if (GetOwners(prev_ref_pair) == 0) {
// We took ownership of the WorkSerializer. Invoke callback and drain queue.
SetCurrentThread();
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Executing immediately");
}
@ -169,7 +137,7 @@ void WorkSerializer::LegacyWorkSerializer::Run(std::function<void()> callback,
}
}
void WorkSerializer::LegacyWorkSerializer::Schedule(
void WorkSerializer::WorkSerializerImpl::Schedule(
std::function<void()> callback, const DebugLocation& location) {
CallbackWrapper* cb_wrapper =
new CallbackWrapper(std::move(callback), location);
@ -182,7 +150,7 @@ void WorkSerializer::LegacyWorkSerializer::Schedule(
queue_.Push(&cb_wrapper->mpscq_node);
}
void WorkSerializer::LegacyWorkSerializer::Orphan() {
void WorkSerializer::WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
@ -198,7 +166,7 @@ void WorkSerializer::LegacyWorkSerializer::Orphan() {
// The thread that calls this loans itself to the work serializer so as to
// execute all the scheduled callbacks.
void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
void WorkSerializer::WorkSerializerImpl::DrainQueue() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
@ -207,7 +175,9 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
const uint64_t prev_ref_pair =
refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel);
if (GetOwners(prev_ref_pair) == 0) {
SetCurrentThread();
#ifndef NDEBUG
current_thread_ = std::this_thread::get_id();
#endif
// We took ownership of the WorkSerializer. Drain the queue.
DrainQueueOwned();
} else {
@ -219,7 +189,7 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueue() {
}
}
void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this);
}
@ -236,10 +206,12 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
}
if (GetSize(prev_ref_pair) == 2) {
// Queue drained. Give up ownership but only if queue remains empty.
#ifndef NDEBUG
// Reset current_thread_ before giving up ownership to avoid TSAN
// race. If we don't wind up giving up ownership, we'll set this
// again below before we pull the next callback out of the queue.
ClearCurrentThread();
current_thread_ = std::thread::id();
#endif
uint64_t expected = MakeRefPair(1, 1);
if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1),
std::memory_order_acq_rel)) {
@ -254,8 +226,10 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
delete this;
return;
}
#ifndef NDEBUG
// Didn't wind up giving up ownership, so set current_thread_ again.
SetCurrentThread();
current_thread_ = std::this_thread::get_id();
#endif
}
// There is at least one callback on the queue. Pop the callback from the
// queue and execute it.
@ -279,231 +253,14 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() {
}
}
//
// WorkSerializer::DispatchingWorkSerializer
//
// DispatchingWorkSerializer: executes callbacks one at a time on EventEngine.
// One at a time guarantees that fixed size thread pools in EventEngine
// implementations are not starved of threads by long running work serializers.
// We implement EventEngine::Closure directly to avoid allocating once per
// callback in the queue when scheduling.
class WorkSerializer::DispatchingWorkSerializer final
: public WorkSerializerImpl,
public grpc_event_engine::experimental::EventEngine::Closure {
public:
explicit DispatchingWorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: event_engine_(std::move(event_engine)) {}
void Run(std::function<void()> callback,
const DebugLocation& location) override;
void Schedule(std::function<void()> callback,
const DebugLocation& location) override {
// We always dispatch to event engine, so Schedule and Run share semantics.
Run(callback, location);
}
void DrainQueue() override {}
void Orphan() override;
// Override EventEngine::Closure
void Run() override;
#ifndef NDEBUG
bool RunningInWorkSerializer() const override {
return running_work_serializer_ == this;
}
#endif
private:
// Wrapper to capture DebugLocation for the callback.
struct CallbackWrapper {
CallbackWrapper(std::function<void()> cb, const DebugLocation& loc)
: callback(std::move(cb)), location(loc) {}
std::function<void()> callback;
// GPR_NO_UNIQUE_ADDRESS means this is 0 sized in release builds.
GPR_NO_UNIQUE_ADDRESS DebugLocation location;
};
using CallbackVector = absl::InlinedVector<CallbackWrapper, 1>;
// Refill processing_ from incoming_
// If processing_ is empty, also update running_ and return false.
// If additionally orphaned, will also delete this (therefore, it's not safe
// to touch any member variables if Refill returns false).
bool Refill();
// Perform the parts of Refill that need to acquire mu_
// Returns a tri-state indicating whether we were refilled successfully (=>
// keep running), or finished, and then if we were orphaned.
enum class RefillResult { kRefilled, kFinished, kFinishedAndOrphaned };
RefillResult RefillInner();
#ifndef NDEBUG
void SetCurrentThread() { running_work_serializer_ = this; }
void ClearCurrentThread() { running_work_serializer_ = nullptr; }
#else
void SetCurrentThread() {}
void ClearCurrentThread() {}
#endif
// Member variables are roughly sorted to keep processing cache lines
// separated from incoming cache lines.
// Callbacks that are currently being processed.
// Only accessed by: a Run() call going from not-running to running, or a work
// item being executed in EventEngine -- ie this does not need a mutex because
// all access is serialized.
// Stored in reverse execution order so that callbacks can be `pop_back()`'d
// on completion to free up any resources they hold.
CallbackVector processing_;
// EventEngine instance upon which we'll do our work.
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
// Flags containing run state:
// - running_ goes from false->true whenever the first callback is scheduled
// on an idle WorkSerializer, and transitions back to false after the last
// callback scheduled is completed and the WorkSerializer is again idle.
// - orphaned_ transitions to true once upon Orphan being called.
// When orphaned_ is true and running_ is false, the DispatchingWorkSerializer
// instance is deleted.
bool running_ ABSL_GUARDED_BY(mu_) = false;
bool orphaned_ ABSL_GUARDED_BY(mu_) = false;
Mutex mu_;
// Queued callbacks. New work items land here, and when processing_ is drained
// we move this entire queue into processing_ and work on draining it again.
// In low traffic scenarios this gives two mutex acquisitions per work item,
// but as load increases we get some natural batching and the rate of mutex
// acquisitions per work item tends towards 1.
CallbackVector incoming_ ABSL_GUARDED_BY(mu_);
#ifndef NDEBUG
static thread_local DispatchingWorkSerializer* running_work_serializer_;
#endif
};
#ifndef NDEBUG
thread_local WorkSerializer::DispatchingWorkSerializer*
WorkSerializer::DispatchingWorkSerializer::running_work_serializer_ =
nullptr;
#endif
void WorkSerializer::DispatchingWorkSerializer::Orphan() {
ReleasableMutexLock lock(&mu_);
// If we're not running, then we can delete immediately.
if (!running_) {
lock.Release();
delete this;
return;
}
// Otherwise store a flag to delete when we're done.
orphaned_ = true;
}
// Implementation of WorkSerializerImpl::Run
void WorkSerializer::DispatchingWorkSerializer::Run(
std::function<void()> callback, const DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer[%p] Scheduling callback [%s:%d]", this,
location.file(), location.line());
}
MutexLock lock(&mu_);
if (!running_) {
// If we were previously idle, insert this callback directly into the empty
// processing_ list and start running.
running_ = true;
GPR_ASSERT(processing_.empty());
processing_.emplace_back(std::move(callback), location);
event_engine_->Run(this);
} else {
// We are already running, so add this callback to the incoming_ list.
// The work loop will eventually get to it.
incoming_.emplace_back(std::move(callback), location);
}
}
// Implementation of EventEngine::Closure::Run - our actual work loop
void WorkSerializer::DispatchingWorkSerializer::Run() {
// TODO(ctiller): remove these when we can deprecate ExecCtx
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
// Grab the last element of processing_ - which is the next item in our queue
// since processing_ is stored in reverse order.
auto& cb = processing_.back();
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer[%p] Executing callback [%s:%d]", this,
cb.location.file(), cb.location.line());
}
// Run the work item.
SetCurrentThread();
cb.callback();
// pop_back here destroys the callback - freeing any resources it might hold.
// We do so before clearing the current thread in case the callback destructor
// wants to check that it's in the WorkSerializer too.
processing_.pop_back();
ClearCurrentThread();
// Check if we've drained the queue and if so refill it.
if (processing_.empty() && !Refill()) return;
// There's still work in processing_, so schedule ourselves again on
// EventEngine.
event_engine_->Run(this);
}
WorkSerializer::DispatchingWorkSerializer::RefillResult
WorkSerializer::DispatchingWorkSerializer::RefillInner() {
// Recover any memory held by processing_, so that we don't grow forever.
// Do so before acquiring a lock so we don't cause inadvertent contention.
processing_.shrink_to_fit();
MutexLock lock(&mu_);
// Swap incoming_ into processing_ - effectively lets us release memory
// (outside the lock) once per iteration for the storage vectors.
processing_.swap(incoming_);
// If there were no items, then we've finished running.
if (processing_.empty()) {
running_ = false;
// And if we're also orphaned then it's time to delete this object.
if (orphaned_) {
return RefillResult::kFinishedAndOrphaned;
} else {
return RefillResult::kFinished;
}
}
return RefillResult::kRefilled;
}
bool WorkSerializer::DispatchingWorkSerializer::Refill() {
const auto result = RefillInner();
switch (result) {
case RefillResult::kRefilled:
// Reverse processing_ so that we can pop_back() items in the correct
// order. (note that this is mostly pointer swaps inside the
// std::function's, so should be relatively cheap even for longer lists).
// Do so here so we're outside of the RefillInner lock.
std::reverse(processing_.begin(), processing_.end());
return true;
case RefillResult::kFinished:
return false;
case RefillResult::kFinishedAndOrphaned:
// Orphaned and finished - finally delete this object.
// Here so that the mutex lock in RefillInner is released.
delete this;
return false;
}
}
//
// WorkSerializer
//
WorkSerializer::WorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
: impl_(IsWorkSerializerDispatchEnabled()
? OrphanablePtr<WorkSerializerImpl>(
MakeOrphanable<DispatchingWorkSerializer>(
std::move(event_engine)))
: OrphanablePtr<WorkSerializerImpl>(
MakeOrphanable<LegacyWorkSerializer>())) {}
WorkSerializer::WorkSerializer()
: impl_(MakeOrphanable<WorkSerializerImpl>()) {}
WorkSerializer::~WorkSerializer() = default;
WorkSerializer::~WorkSerializer() {}
void WorkSerializer::Run(std::function<void()> callback,
const DebugLocation& location) {

@ -20,12 +20,9 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <memory>
#include "absl/base/thread_annotations.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -47,28 +44,14 @@ namespace grpc_core {
// invoke DrainQueue() when it is safe to invoke the callback.
class ABSL_LOCKABLE WorkSerializer {
public:
explicit WorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine);
~WorkSerializer();
WorkSerializer();
WorkSerializer(const WorkSerializer&) = delete;
WorkSerializer& operator=(const WorkSerializer&) = delete;
WorkSerializer(WorkSerializer&&) noexcept = default;
WorkSerializer& operator=(WorkSerializer&&) noexcept = default;
~WorkSerializer();
// Runs a given callback on the work serializer.
//
// If experiment `work_serializer_dispatch` is enabled:
// The callback will be executed as an EventEngine callback, that then
// arranges for the next callback in the queue to execute.
//
// If experiment `work_serializer_dispatch` is NOT enabled:
// If there is no other thread currently executing the WorkSerializer, the
// callback is run immediately. In this case, the current thread is also
// borrowed for draining the queue for any callbacks that get added in the
// meantime.
// This behavior is deprecated and will be removed soon.
// Runs a given callback on the work serializer. If there is no other thread
// currently executing the WorkSerializer, the callback is run immediately. In
// this case, the current thread is also borrowed for draining the queue for
// any callbacks that get added in the meantime.
//
// If you want to use clang thread annotation to make sure that callback is
// called by WorkSerializer only, you need to add the annotation to both the
@ -81,6 +64,9 @@ class ABSL_LOCKABLE WorkSerializer {
// }, DEBUG_LOCATION);
// }
// void callback() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer) { ... }
//
// TODO(yashkt): Replace DebugLocation with absl::SourceLocation
// once we can start using it directly.
void Run(std::function<void()> callback, const DebugLocation& location);
// Schedule \a callback to be run later when the queue of callbacks is
@ -96,8 +82,6 @@ class ABSL_LOCKABLE WorkSerializer {
private:
class WorkSerializerImpl;
class LegacyWorkSerializer;
class DispatchingWorkSerializer;
OrphanablePtr<WorkSerializerImpl> impl_;
};

@ -123,7 +123,6 @@ grpc_cc_test(
"//:grpc",
"//:grpc++",
"//src/core:channel_args",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/util:grpc_test_util",
"//test/cpp/util:channel_trace_proto_helper",
],

@ -22,16 +22,13 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <memory>
#include <ratio>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/channel_arg_names.h>
@ -41,21 +38,15 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_reader.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/server.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/channel_trace_proto_helper.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
using grpc_event_engine::experimental::WaitForSingleOwner;
namespace grpc_core {
namespace channelz {
namespace testing {
@ -348,15 +339,9 @@ TEST_P(ChannelzChannelTest, LastCallStartedTime) {
class ChannelzRegistryBasedTest : public ::testing::TestWithParam<size_t> {
protected:
// ensure we always have a fresh registry for tests.
void SetUp() override {
WaitForSingleOwner(GetDefaultEventEngine());
ChannelzRegistry::TestOnlyReset();
}
void SetUp() override { ChannelzRegistry::TestOnlyReset(); }
void TearDown() override {
WaitForSingleOwner(GetDefaultEventEngine());
ChannelzRegistry::TestOnlyReset();
}
void TearDown() override { ChannelzRegistry::TestOnlyReset(); }
};
TEST_F(ChannelzRegistryBasedTest, BasicGetTopChannelsTest) {
@ -514,27 +499,19 @@ TEST_F(ChannelzRegistryBasedTest, GetTopChannelsUuidAfterCompaction) {
even_channels.push_back(std::make_unique<ChannelFixture>());
}
}
Notification done;
grpc_event_engine::experimental::GetDefaultEventEngine()->RunAfter(
std::chrono::seconds(5 * grpc_test_slowdown_factor()), [&] {
ExecCtx exec_ctx;
std::string json_str = ChannelzRegistry::GetTopChannels(0);
auto parsed_json = JsonParse(json_str);
ASSERT_TRUE(parsed_json.ok()) << parsed_json.status();
ASSERT_EQ(parsed_json->type(), Json::Type::kObject);
Json channel_json;
auto it = parsed_json->object().find("channel");
if (it != parsed_json->object().end()) channel_json = it->second;
ValidateJsonArraySize(channel_json, kLoopIterations);
std::vector<intptr_t> uuids =
GetUuidListFromArray(channel_json.array());
for (int i = 0; i < kLoopIterations; ++i) {
// only the even uuids will still be present.
EXPECT_EQ((i + 1) * 2, uuids[i]);
}
done.Notify();
});
done.WaitForNotification();
std::string json_str = ChannelzRegistry::GetTopChannels(0);
auto parsed_json = JsonParse(json_str);
ASSERT_TRUE(parsed_json.ok()) << parsed_json.status();
ASSERT_EQ(parsed_json->type(), Json::Type::kObject);
Json channel_json;
auto it = parsed_json->object().find("channel");
if (it != parsed_json->object().end()) channel_json = it->second;
ValidateJsonArraySize(channel_json, kLoopIterations);
std::vector<intptr_t> uuids = GetUuidListFromArray(channel_json.array());
for (int i = 0; i < kLoopIterations; ++i) {
// only the even uuids will still be present.
EXPECT_EQ((i + 1) * 2, uuids[i]);
}
}
TEST_F(ChannelzRegistryBasedTest, InternalChannelTest) {

@ -34,8 +34,7 @@ grpc_cc_library(
deps = [
"//src/core:lb_policy",
"//src/core:subchannel_interface",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine:mock_event_engine",
],
)
@ -83,9 +82,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = [
"no_test_ios",
],
tags = ["no_test_ios"],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -120,9 +117,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = [
"no_test_ios",
],
tags = ["no_test_ios"],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -155,9 +150,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = [
"no_test_ios",
],
tags = ["no_test_ios"],
uses_event_engine = False,
uses_polling = False,
deps = [
@ -208,9 +201,7 @@ grpc_cc_test(
"gtest",
],
language = "C++",
tags = [
"no_test_ios",
],
tags = ["no_test_ios"],
uses_event_engine = False,
uses_polling = False,
deps = [

@ -20,6 +20,7 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <chrono>
@ -28,14 +29,15 @@
#include <initializer_list>
#include <map>
#include <memory>
#include <ratio>
#include <set>
#include <string>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
@ -84,9 +86,7 @@
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/uri/uri_parser.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/event_engine/mock_event_engine.h"
namespace grpc_core {
namespace testing {
@ -106,7 +106,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// given SubchannelState object.
class FakeSubchannel : public SubchannelInterface {
public:
explicit FakeSubchannel(SubchannelState* state) : state_(state) {}
FakeSubchannel(SubchannelState* state,
std::shared_ptr<WorkSerializer> work_serializer)
: state_(state), work_serializer_(std::move(work_serializer)) {}
~FakeSubchannel() override {
if (orca_watcher_ != nullptr) {
@ -151,13 +153,27 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
gpr_log(GPR_INFO, "notifying watcher: state=%s status=%s",
ConnectivityStateName(new_state), status.ToString().c_str());
watcher_->OnConnectivityStateChange(new_state, status);
watcher()->OnConnectivityStateChange(new_state, status);
}
private:
std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
SubchannelInterface::ConnectivityStateWatcherInterface* watcher()
const {
return Match(
watcher_,
[](const std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); },
[](const std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>&
watcher) { return watcher.get(); });
}
absl::variant<
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>,
std::shared_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>>
watcher_;
};
@ -165,10 +181,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::unique_ptr<
SubchannelInterface::ConnectivityStateWatcherInterface>
watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
auto* watcher_ptr = watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
state_->work_serializer(), std::move(watcher));
work_serializer_, std::move(watcher));
watcher_map_[watcher_ptr] = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
@ -176,7 +192,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
auto it = watcher_map_.find(watcher);
if (it == watcher_map_.end()) return;
state_->state_tracker_.RemoveWatcher(it->second);
@ -188,9 +204,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
state_->requested_connection_ = true;
}
void AddDataWatcher(
std::unique_ptr<DataWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)
override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
auto* w =
static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
@ -207,7 +222,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
auto connectivity_watcher = health_watcher_->TakeWatcher();
auto* connectivity_watcher_ptr = connectivity_watcher.get();
auto watcher_wrapper = MakeOrphanable<WatcherWrapper>(
state_->work_serializer(), std::move(connectivity_watcher));
work_serializer_, std::move(connectivity_watcher));
health_watcher_wrapper_ = watcher_wrapper.get();
state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN,
std::move(watcher_wrapper));
@ -220,7 +235,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
void CancelDataWatcher(DataWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) {
MutexLock lock(&state_->backend_metric_watcher_mu_);
auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
if (w->type() == OrcaProducer::Type()) {
@ -245,6 +260,7 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void ResetBackoff() override {}
SubchannelState* state_;
std::shared_ptr<WorkSerializer> work_serializer_;
std::map<SubchannelInterface::ConnectivityStateWatcherInterface*,
WatcherWrapper*>
watcher_map_;
@ -253,9 +269,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::unique_ptr<OrcaWatcher> orca_watcher_;
};
SubchannelState(absl::string_view address, LoadBalancingPolicyTest* test)
SubchannelState(absl::string_view address,
std::shared_ptr<WorkSerializer> work_serializer)
: address_(address),
test_(test),
work_serializer_(std::move(work_serializer)),
state_tracker_("LoadBalancingPolicyTest") {}
const std::string& address() const { return address_; }
@ -313,44 +330,16 @@ class LoadBalancingPolicyTest : public ::testing::Test {
<< "bug in test: " << ConnectivityStateName(state)
<< " must have OK status: " << status;
}
// Updating the state in the state tracker will enqueue
// notifications to watchers on the WorkSerializer. If any
// subchannel reports READY, the pick_first leaf policy will then
// start a health watch, whose initial notification will also be
// scheduled on the WorkSerializer. We don't want to return until
// all of those notifications have been delivered.
absl::Notification notification;
test_->work_serializer_->Run(
[&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*test_->work_serializer_) {
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(),
state, location);
}
gpr_log(GPR_INFO, "Setting state on tracker");
state_tracker_.SetState(state, status, "set from test");
// SetState() enqueued the connectivity state notifications for
// the subchannel, so we add another callback to the queue to be
// executed after that state notifications has been delivered.
gpr_log(GPR_INFO,
"Waiting for state notifications to be delivered");
test_->work_serializer_->Run(
[&]() {
gpr_log(GPR_INFO,
"State notifications delivered, waiting for health "
"notifications");
// Now the connectivity state notifications has been
// delivered. If the state reported was READY, then the
// pick_first leaf policy will have started a health watch, so
// we add another callback to the queue to be executed after
// the initial health watch notification has been delivered.
test_->work_serializer_->Run([&]() { notification.Notify(); },
DEBUG_LOCATION);
},
DEBUG_LOCATION);
},
work_serializer_->Run(
[this, state, status, validate_state_transition, location]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) {
if (validate_state_transition) {
AssertValidConnectivityStateTransition(state_tracker_.state(),
state, location);
}
state_tracker_.SetState(state, status, "set from test");
},
DEBUG_LOCATION);
notification.WaitForNotification();
gpr_log(GPR_INFO, "Health notifications delivered");
}
// Indicates if any of the associated SubchannelInterface objects
@ -362,8 +351,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
// To be invoked by FakeHelper.
RefCountedPtr<SubchannelInterface> CreateSubchannel() {
return MakeRefCounted<FakeSubchannel>(this);
RefCountedPtr<SubchannelInterface> CreateSubchannel(
std::shared_ptr<WorkSerializer> work_serializer) {
return MakeRefCounted<FakeSubchannel>(this, std::move(work_serializer));
}
// Sends an OOB backend metric report to all watchers.
@ -384,15 +374,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
}
std::shared_ptr<WorkSerializer> work_serializer() {
return test_->work_serializer_;
}
private:
const std::string address_;
LoadBalancingPolicyTest* const test_;
ConnectivityStateTracker state_tracker_
ABSL_GUARDED_BY(*test_->work_serializer_);
std::shared_ptr<WorkSerializer> work_serializer_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_);
Mutex requested_connection_mu_;
bool requested_connection_ ABSL_GUARDED_BY(&requested_connection_mu_) =
@ -424,7 +409,13 @@ class LoadBalancingPolicyTest : public ::testing::Test {
std::string ToString() const { return "RERESOLUTION"; }
};
explicit FakeHelper(LoadBalancingPolicyTest* test) : test_(test) {}
FakeHelper(LoadBalancingPolicyTest* test,
std::shared_ptr<WorkSerializer> work_serializer,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: test_(test),
work_serializer_(std::move(work_serializer)),
event_engine_(std::move(event_engine)) {}
bool QueueEmpty() {
MutexLock lock(&mu_);
@ -482,39 +473,6 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
private:
// A wrapper for a picker that hops into the WorkSerializer to
// release the ref to the picker.
class PickerWrapper : public LoadBalancingPolicy::SubchannelPicker {
public:
PickerWrapper(LoadBalancingPolicyTest* test,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
: test_(test), picker_(std::move(picker)) {
gpr_log(GPR_INFO, "creating wrapper %p for picker %p", this,
picker_.get());
}
void Orphan() override {
absl::Notification notification;
test_->work_serializer_->Run(
[notification = &notification,
picker = std::move(picker_)]() mutable {
picker.reset();
notification->Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
}
LoadBalancingPolicy::PickResult Pick(
LoadBalancingPolicy::PickArgs args) override {
return picker_->Pick(args);
}
private:
LoadBalancingPolicyTest* const test_;
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_;
};
// Represents an event reported by the LB policy.
using Event = absl::variant<StateUpdate, ReresolutionRequested>;
@ -544,19 +502,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {
GPR_ASSERT(address_uri.ok());
it = test_->subchannel_pool_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(std::move(*address_uri), test_))
std::forward_as_tuple(std::move(*address_uri),
work_serializer_))
.first;
}
return it->second.CreateSubchannel();
return it->second.CreateSubchannel(work_serializer_);
}
void UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
MutexLock lock(&mu_);
StateUpdate update{
state, status,
MakeRefCounted<PickerWrapper>(test_, std::move(picker))};
StateUpdate update{state, status, std::move(picker)};
gpr_log(GPR_INFO, "state update from LB policy: %s",
update.ToString().c_str());
queue_.push_back(std::move(update));
@ -579,12 +536,14 @@ class LoadBalancingPolicyTest : public ::testing::Test {
}
grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
return test_->fuzzing_ee_.get();
return event_engine_.get();
}
void AddTraceEvent(TraceSeverity, absl::string_view) override {}
LoadBalancingPolicyTest* test_;
std::shared_ptr<WorkSerializer> work_serializer_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
Mutex mu_;
std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
@ -674,36 +633,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
const absl::optional<BackendMetricData> backend_metric_data_;
};
explicit LoadBalancingPolicyTest(absl::string_view lb_policy_name)
: lb_policy_name_(lb_policy_name) {}
void SetUp() override {
// Order is important here: Fuzzing EE needs to be created before
// grpc_init(), and the POSIX EE (which is used by the WorkSerializer)
// needs to be created after grpc_init().
fuzzing_ee_ =
std::make_shared<grpc_event_engine::experimental::FuzzingEventEngine>(
grpc_event_engine::experimental::FuzzingEventEngine::Options(),
fuzzing_event_engine::Actions());
grpc_init();
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
work_serializer_ = std::make_shared<WorkSerializer>(event_engine_);
auto helper = std::make_unique<FakeHelper>(this);
helper_ = helper.get();
LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper),
ChannelArgs()};
lb_policy_ =
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
lb_policy_name_, std::move(args));
GPR_ASSERT(lb_policy_ != nullptr);
}
LoadBalancingPolicyTest()
: work_serializer_(std::make_shared<WorkSerializer>()) {}
void TearDown() override {
fuzzing_ee_->FuzzingDone();
// Make sure pickers (and transitively, subchannels) are unreffed before
// destroying the fixture.
WaitForWorkSerializerToFlush();
work_serializer_.reset();
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
@ -711,18 +644,20 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// (This will cause an ASAN failure, but it will not display the
// queued events, so the failure will be harder to diagnose.)
helper_->ExpectQueueEmpty();
lb_policy_.reset();
fuzzing_ee_->TickUntilIdle();
grpc_event_engine::experimental::WaitForSingleOwner(
std::move(event_engine_));
event_engine_.reset();
grpc_shutdown_blocking();
fuzzing_ee_.reset();
}
LoadBalancingPolicy* lb_policy() const {
GPR_ASSERT(lb_policy_ != nullptr);
return lb_policy_.get();
// Creates an LB policy of the specified name.
// Creates a new FakeHelper for the new LB policy, and sets helper_ to
// point to the FakeHelper.
OrphanablePtr<LoadBalancingPolicy> MakeLbPolicy(absl::string_view name) {
auto helper =
std::make_unique<FakeHelper>(this, work_serializer_, event_engine_);
helper_ = helper.get();
LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper),
ChannelArgs()};
return CoreConfiguration::Get()
.lb_policy_registry()
.CreateLoadBalancingPolicy(name, std::move(args));
}
// Creates an LB policy config from json.
@ -764,41 +699,14 @@ class LoadBalancingPolicyTest : public ::testing::Test {
LoadBalancingPolicy* lb_policy) {
ExecCtx exec_ctx;
absl::Status status;
// When the LB policy gets the update, it will create new
// subchannels, and it will register connectivity state watchers and
// optionally health watchers for each one. We don't want to return
// until all the initial notifications for all of those watchers
// have been delivered to the LB policy.
absl::Notification notification;
work_serializer_->Run(
[&]() {
status = lb_policy->UpdateLocked(std::move(update_args));
// UpdateLocked() enqueued the initial connectivity state
// notifications for the subchannels, so we add another
// callback to the queue to be executed after those initial
// state notifications have been delivered.
gpr_log(GPR_INFO,
"Applied update, waiting for initial connectivity state "
"notifications");
work_serializer_->Run(
[&]() {
gpr_log(GPR_INFO,
"Initial connectivity state notifications delivered; "
"waiting for health notifications");
// Now that the initial state notifications have been
// delivered, the queue will contain the health watch
// notifications for any subchannels in state READY,
// so we add another callback to the queue to be
// executed after those health watch notifications have
// been delivered.
work_serializer_->Run([&]() { notification.Notify(); },
DEBUG_LOCATION);
},
DEBUG_LOCATION);
notification.Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
gpr_log(GPR_INFO, "health notifications delivered");
return status;
}
@ -951,7 +859,6 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return false; // Stop.
},
location);
gpr_log(GPR_INFO, "done waiting for expected RR addresses");
return retval;
}
@ -1108,30 +1015,18 @@ class LoadBalancingPolicyTest : public ::testing::Test {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> ExpectRoundRobinStartup(
absl::Span<const absl::string_view> addresses) {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
EXPECT_NE(subchannel, nullptr);
if (subchannel == nullptr) return nullptr;
// RR should ask each subchannel to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Expect the initial CONNECTNG update with a picker that queues.
if (i == 0) ExpectConnectingUpdate();
// The connection attempts succeed.
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
if (i == 0) {
// When the first subchannel becomes READY, accept any number of
// CONNECTING updates with a picker that queues followed by a READY
// update with a picker that repeatedly returns only the first address.
picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
// When each subsequent subchannel becomes READY, we accept any number
// of READY updates where the picker returns only the previously
// connected subchannel(s) followed by a READY update where the picker
// returns the previously connected subchannel(s) *and* the newly
// connected subchannel.
picker = WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
@ -1196,57 +1091,87 @@ class LoadBalancingPolicyTest : public ::testing::Test {
SubchannelKey key(MakeAddress(address), args);
auto it = subchannel_pool_
.emplace(std::piecewise_construct, std::forward_as_tuple(key),
std::forward_as_tuple(address, this))
std::forward_as_tuple(address, work_serializer_))
.first;
return &it->second;
}
void WaitForWorkSerializerToFlush() {
gpr_log(GPR_INFO, "waiting for WorkSerializer to flush...");
absl::Notification notification;
work_serializer_->Run([&]() { notification.Notify(); }, DEBUG_LOCATION);
notification.WaitForNotification();
gpr_log(GPR_INFO, "WorkSerializer flush complete");
std::shared_ptr<WorkSerializer> work_serializer_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
grpc_event_engine::experimental::GetDefaultEventEngine();
FakeHelper* helper_ = nullptr;
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
};
// A subclass to be used for LB policies that start timers.
// Injects a mock EventEngine and provides the necessary framework for
// incrementing time and handling timer callbacks.
class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest {
protected:
// A custom time cache for which InvalidateCache() is a no-op. This
// ensures that when the timer callback instantiates its own ExecCtx
// and therefore its own ScopedTimeCache, it continues to see the time
// that we are injecting in the test.
class TestTimeCache final : public Timestamp::ScopedSource {
public:
TestTimeCache() : cached_time_(previous()->Now()) {}
Timestamp Now() override { return cached_time_; }
void InvalidateCache() override {}
void IncrementBy(Duration duration) { cached_time_ += duration; }
private:
Timestamp cached_time_;
};
TimeAwareLoadBalancingPolicyTest() {
auto mock_ee =
std::make_shared<grpc_event_engine::experimental::MockEventEngine>();
auto capture = [this](std::chrono::duration<int64_t, std::nano> duration,
absl::AnyInvocable<void()> callback) {
CheckExpectedTimerDuration(duration);
intptr_t key = next_key_++;
timer_callbacks_[key] = std::move(callback);
return grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0};
};
ON_CALL(*mock_ee,
RunAfter(::testing::_, ::testing::A<absl::AnyInvocable<void()>>()))
.WillByDefault(capture);
auto cancel =
[this](
grpc_event_engine::experimental::EventEngine::TaskHandle handle) {
auto it = timer_callbacks_.find(handle.keys[0]);
if (it == timer_callbacks_.end()) return false;
timer_callbacks_.erase(it);
return true;
};
ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel);
// Store in base class, to make it visible to the LB policy.
event_engine_ = std::move(mock_ee);
}
void IncrementTimeBy(Duration duration) {
fuzzing_ee_->TickForDuration(duration);
// Flush WorkSerializer, in case the timer callback enqueued anything.
WaitForWorkSerializerToFlush();
~TimeAwareLoadBalancingPolicyTest() override {
EXPECT_TRUE(timer_callbacks_.empty())
<< "WARNING: Test did not run all timer callbacks";
}
void SetExpectedTimerDuration(
absl::optional<grpc_event_engine::experimental::EventEngine::Duration>
duration) {
if (duration.has_value()) {
fuzzing_ee_->SetRunAfterDurationCallback(
[expected = *duration](
grpc_event_engine::experimental::EventEngine::Duration duration) {
EXPECT_EQ(duration, expected)
<< "Expected: " << expected.count()
<< "ns\nActual: " << duration.count() << "ns";
});
} else {
fuzzing_ee_->SetRunAfterDurationCallback(nullptr);
}
void RunTimerCallback() {
ASSERT_EQ(timer_callbacks_.size(), 1UL);
auto it = timer_callbacks_.begin();
ASSERT_NE(it->second, nullptr);
std::move(it->second)();
timer_callbacks_.erase(it);
}
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
fuzzing_ee_;
// TODO(ctiller): this is a normal event engine, yet it gets its time measure
// from fuzzing_ee_ -- results are likely to be a little funky, but seem to do
// well enough for the tests we have today.
// We should transition everything here to just use fuzzing_ee_, but that
// needs some thought on how to Tick() at appropriate times, as there are
// Notification objects buried everywhere in this code, and
// WaitForNotification is deeply incompatible with a single threaded event
// engine that doesn't run callbacks until its public Tick method is called.
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
std::shared_ptr<WorkSerializer> work_serializer_;
FakeHelper* helper_ = nullptr;
std::map<SubchannelKey, SubchannelState> subchannel_pool_;
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
const absl::string_view lb_policy_name_;
// Called when the LB policy starts a timer.
// May be overridden by subclasses.
virtual void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration) {}
std::map<intptr_t, absl::AnyInvocable<void()>> timer_callbacks_;
intptr_t next_key_ = 1;
TestTimeCache time_cache_;
};
} // namespace testing

@ -21,6 +21,7 @@
#include <array>
#include <chrono>
#include <memory>
#include <ratio>
#include <string>
#include <utility>
#include <vector>
@ -30,12 +31,14 @@
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
@ -47,7 +50,7 @@ namespace grpc_core {
namespace testing {
namespace {
class OutlierDetectionTest : public LoadBalancingPolicyTest {
class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
@ -143,12 +146,7 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest {
};
OutlierDetectionTest()
: LoadBalancingPolicyTest("outlier_detection_experimental") {}
void SetUp() override {
LoadBalancingPolicyTest::SetUp();
SetExpectedTimerDuration(std::chrono::seconds(10));
}
: lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {}
absl::optional<std::string> DoPickWithFailedCall(
LoadBalancingPolicy::SubchannelPicker* picker) {
@ -166,13 +164,25 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest {
}
return address;
}
void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_internal_)
<< "Expected: " << expected_internal_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration expected_internal_ =
std::chrono::seconds(10);
};
TEST_F(OutlierDetectionTest, Basic) {
constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443";
// Send an update containing one address.
absl::Status status = ApplyUpdate(
BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy());
BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the address.
auto* subchannel = FindSubchannel(kAddressUri);
@ -206,7 +216,7 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
.SetFailurePercentageMinimumHosts(1)
.SetFailurePercentageRequestVolume(1)
.Build()),
lb_policy());
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// Expect normal startup.
auto picker = ExpectRoundRobinStartup(kAddresses);
@ -217,7 +227,8 @@ TEST_F(OutlierDetectionTest, FailurePercentage) {
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
IncrementTimeBy(Duration::Seconds(10));
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection complete");
if (!IsRoundRobinDelegateToPickFirstEnabled()) ExpectReresolutionRequest();
// Expect a picker update.
@ -240,7 +251,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
.SetFailurePercentageRequestVolume(1)
.SetChildPolicy({{"pick_first", Json::FromObject({})}})
.Build()),
lb_policy());
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for the first address.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -268,7 +279,8 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
ASSERT_TRUE(address.has_value());
gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
// Advance time and run the timer callback to trigger ejection.
IncrementTimeBy(Duration::Seconds(10));
time_cache_.IncrementBy(Duration::Seconds(10));
RunTimerCallback();
gpr_log(GPR_INFO, "### ejection timer pass complete");
// Subchannel should not be ejected.
ExpectQueueEmpty();
@ -283,5 +295,8 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -25,7 +25,6 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gmock/gmock.h"
@ -35,6 +34,7 @@
#include <grpc/support/json.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/json/json.h"
@ -48,7 +48,7 @@ namespace {
class PickFirstTest : public LoadBalancingPolicyTest {
protected:
PickFirstTest() : LoadBalancingPolicyTest("pick_first") {}
PickFirstTest() : lb_policy_(MakeLbPolicy("pick_first")) {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakePickFirstConfig(
absl::optional<bool> shuffle_address_list = absl::nullopt) {
@ -65,19 +65,9 @@ class PickFirstTest : public LoadBalancingPolicyTest {
void GetOrderAddressesArePicked(
absl::Span<const absl::string_view> addresses,
std::vector<absl::string_view>* out_address_order) {
work_serializer_->Run([&]() { lb_policy_->ExitIdleLocked(); },
DEBUG_LOCATION);
out_address_order->clear();
// Note: ExitIdle() will enqueue a bunch of connectivity state
// notifications on the WorkSerializer, and we want to wait until
// those are delivered to the LB policy.
absl::Notification notification;
work_serializer_->Run(
[&]() {
lb_policy()->ExitIdleLocked();
work_serializer_->Run([&]() { notification.Notify(); },
DEBUG_LOCATION);
},
DEBUG_LOCATION);
notification.WaitForNotification();
// Construct a map of subchannel to address.
// We will remove entries as each subchannel starts to connect.
std::map<SubchannelState*, absl::string_view> subchannels;
@ -132,6 +122,8 @@ class PickFirstTest : public LoadBalancingPolicyTest {
subchannels.erase(subchannel);
}
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
};
TEST_F(PickFirstTest, FirstAddressWorks) {
@ -139,7 +131,7 @@ TEST_F(PickFirstTest, FirstAddressWorks) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -172,7 +164,7 @@ TEST_F(PickFirstTest, FirstAddressFails) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -224,7 +216,7 @@ TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) {
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for all addresses.
auto* subchannel3 = FindSubchannel(kAddresses[2]);
@ -267,7 +259,7 @@ TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) {
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
@ -313,7 +305,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
absl::UnavailableError("failed to connect"),
/*validate_state_transition=*/false);
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should request re-resolution.
ExpectReresolutionRequest();
@ -332,7 +324,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) {
constexpr std::array<absl::string_view, 2> kAddresses2 = {
kAddresses[0], "ipv4:127.0.0.1:445"};
status = ApplyUpdate(BuildUpdate(kAddresses2, MakePickFirstConfig(false)),
lb_policy());
lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// The LB policy should have created a subchannel for the new address.
auto* subchannel3 = FindSubchannel(kAddresses2[1]);
@ -357,7 +349,7 @@ TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -418,7 +410,7 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
constexpr std::array<absl::string_view, 2> kAddresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
// LB policy should have created a subchannel for both addresses.
auto* subchannel = FindSubchannel(kAddresses[0]);
@ -453,13 +445,6 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) {
// By checking the picker, we told the LB policy to trigger a new
// connection attempt, so it should start over with the first
// subchannel.
// Note that the picker will have enqueued the ExitIdle() call in the
// WorkSerializer, so the first flush will execute that call. But
// executing that call will result in enqueueing subchannel
// connectivity state notifications, so we need to flush again to make
// sure all of that work is done before we continue.
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel starts connecting.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
@ -488,7 +473,7 @@ TEST_F(PickFirstTest, WithShuffle) {
bool shuffled = false;
for (size_t i = 0; i < kMaxTries; ++i) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
GetOrderAddressesArePicked(kAddresses, &addresses_after_update);
if (absl::MakeConstSpan(addresses_after_update) !=
@ -511,7 +496,7 @@ TEST_F(PickFirstTest, ShufflingDisabled) {
constexpr static size_t kMaxAttempts = 5;
for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) {
absl::Status status = ApplyUpdate(
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy());
BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get());
EXPECT_TRUE(status.ok()) << status;
std::vector<absl::string_view> address_order;
GetOrderAddressesArePicked(kAddresses, &address_order);
@ -525,5 +510,8 @@ TEST_F(PickFirstTest, ShufflingDisabled) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -14,6 +14,8 @@
// limitations under the License.
//
#include <stddef.h>
#include <array>
#include "absl/status/status.h"
@ -21,6 +23,11 @@
#include "absl/types/span.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
@ -30,33 +37,68 @@ namespace {
class RoundRobinTest : public LoadBalancingPolicyTest {
protected:
RoundRobinTest() : LoadBalancingPolicyTest("round_robin") {}
RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {}
void ExpectStartup(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, nullptr), lb_policy_.get()),
absl::OkStatus());
// RR should have created a subchannel for each address.
for (size_t i = 0; i < addresses.size(); ++i) {
auto* subchannel = FindSubchannel(addresses[i]);
ASSERT_NE(subchannel, nullptr) << "Address: " << addresses[i];
// RR should ask each subchannel to connect.
EXPECT_TRUE(subchannel->ConnectionRequested());
// The subchannel will connect successfully.
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
// Expect the initial CONNECTNG update with a picker that queues.
if (i == 0) ExpectConnectingUpdate();
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
// As each subchannel becomes READY, we should get a new picker that
// includes the behavior. Note that there may be any number of
// duplicate updates for the previous state in the queue before the
// update that we actually want to see.
if (i == 0) {
// When the first subchannel becomes READY, accept any number of
// CONNECTING updates with a picker that queues followed by a READY
// update with a picker that repeatedly returns only the first address.
auto picker = WaitForConnected();
ExpectRoundRobinPicks(picker.get(), {addresses[0]});
} else {
// When each subsequent subchannel becomes READY, we accept any number
// of READY updates where the picker returns only the previously
// connected subchannel(s) followed by a READY update where the picker
// returns the previously connected subchannel(s) *and* the newly
// connected subchannel.
WaitForRoundRobinListChange(
absl::MakeSpan(addresses).subspan(0, i),
absl::MakeSpan(addresses).subspan(0, i + 1));
}
}
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
};
TEST_F(RoundRobinTest, Basic) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, nullptr), lb_policy()),
absl::OkStatus());
ExpectRoundRobinStartup(kAddresses);
ExpectStartup(kAddresses);
}
TEST_F(RoundRobinTest, AddressUpdates) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, nullptr), lb_policy()),
absl::OkStatus());
ExpectRoundRobinStartup(kAddresses);
ExpectStartup(kAddresses);
// Send update to remove address 2.
EXPECT_EQ(
ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2), nullptr),
lb_policy()),
lb_policy_.get()),
absl::OkStatus());
WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2));
// Send update to remove address 0 and re-add address 2.
EXPECT_EQ(
ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2), nullptr),
lb_policy()),
lb_policy_.get()),
absl::OkStatus());
WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2),
absl::MakeSpan(kAddresses).last(2));
@ -73,5 +115,8 @@ TEST_F(RoundRobinTest, AddressUpdates) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -22,6 +22,7 @@
#include <chrono>
#include <map>
#include <memory>
#include <ratio>
#include <string>
#include <utility>
#include <vector>
@ -35,12 +36,14 @@
#include "absl/types/span.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
@ -64,7 +67,7 @@ BackendMetricData MakeBackendMetricData(double app_utilization, double qps,
return b;
}
class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
protected:
class ConfigBuilder {
public:
@ -110,11 +113,8 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
Json::Object json_;
};
WeightedRoundRobinTest() : LoadBalancingPolicyTest("weighted_round_robin") {}
void SetUp() override {
LoadBalancingPolicyTest::SetUp();
SetExpectedTimerDuration(std::chrono::seconds(1));
WeightedRoundRobinTest() {
lb_policy_ = MakeLbPolicy("weighted_round_robin");
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
@ -125,7 +125,7 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
SourceLocation location = SourceLocation()) {
if (update_addresses.empty()) update_addresses = addresses;
EXPECT_EQ(ApplyUpdate(BuildUpdate(update_addresses, config_builder.Build()),
lb_policy()),
lb_policy_.get()),
absl::OkStatus());
// Expect the initial CONNECTNG update with a picker that queues.
ExpectConnectingUpdate(location);
@ -311,11 +311,24 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
if (*picker == nullptr) return false;
} else if (run_timer_callbacks) {
gpr_log(GPR_INFO, "running timer callback...");
// Increment time and run any timer callbacks.
IncrementTimeBy(Duration::Seconds(1));
RunTimerCallback();
}
// Increment time.
time_cache_.IncrementBy(Duration::Seconds(1));
}
}
void CheckExpectedTimerDuration(
grpc_event_engine::experimental::EventEngine::Duration duration)
override {
EXPECT_EQ(duration, expected_weight_update_interval_)
<< "Expected: " << expected_weight_update_interval_.count() << "ns"
<< "\n Actual: " << duration.count() << "ns";
}
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
grpc_event_engine::experimental::EventEngine::Duration
expected_weight_update_interval_ = std::chrono::seconds(1);
};
TEST_F(WeightedRoundRobinTest, Basic) {
@ -630,7 +643,7 @@ TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
SetExpectedTimerDuration(std::chrono::seconds(2));
expected_weight_update_interval_ = std::chrono::seconds(2);
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Seconds(2)));
ASSERT_NE(picker, nullptr);
@ -648,7 +661,7 @@ TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
SetExpectedTimerDuration(std::chrono::milliseconds(100));
expected_weight_update_interval_ = std::chrono::milliseconds(100);
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses,
ConfigBuilder().SetWeightUpdatePeriod(Duration::Milliseconds(10)));
@ -684,7 +697,8 @@ TEST_F(WeightedRoundRobinTest, WeightExpirationPeriod) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time to make weights stale and trigger the timer callback
// to recompute weights.
IncrementTimeBy(Duration::Seconds(2));
time_cache_.IncrementBy(Duration::Seconds(2));
RunTimerCallback();
// Picker should now be falling back to round-robin.
ExpectWeightedRoundRobinPicks(
picker.get(), {},
@ -711,7 +725,8 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time to make weights stale and trigger the timer callback
// to recompute weights.
IncrementTimeBy(Duration::Seconds(2));
time_cache_.IncrementBy(Duration::Seconds(2));
RunTimerCallback();
// Picker should now be falling back to round-robin.
ExpectWeightedRoundRobinPicks(
picker.get(), {},
@ -729,7 +744,8 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
{{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time past the blackout period. This should cause the
// weights to be used.
IncrementTimeBy(Duration::Seconds(1));
time_cache_.IncrementBy(Duration::Seconds(1));
RunTimerCallback();
ExpectWeightedRoundRobinPicks(
picker.get(), {},
{{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 1}});
@ -775,7 +791,8 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Advance time to exceed the blackout period and trigger the timer
// callback to recompute weights.
IncrementTimeBy(Duration::Seconds(1));
time_cache_.IncrementBy(Duration::Seconds(1));
RunTimerCallback();
ExpectWeightedRoundRobinPicks(
picker.get(),
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
@ -807,9 +824,9 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodDoesNotGetResetAfterUpdate) {
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Send a duplicate update with the same addresses and config.
EXPECT_EQ(
ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()), lb_policy()),
absl::OkStatus());
EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()),
lb_policy_.get()),
absl::OkStatus());
// Note that we have not advanced time, so if the update incorrectly
// triggers resetting the blackout period, none of the weights will
// actually be used.
@ -852,5 +869,8 @@ TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -34,6 +34,7 @@
#include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
#include "src/core/ext/xds/xds_health_status.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
@ -47,7 +48,7 @@ namespace {
class XdsOverrideHostTest : public LoadBalancingPolicyTest {
protected:
XdsOverrideHostTest()
: LoadBalancingPolicyTest("xds_override_host_experimental") {}
: policy_(MakeLbPolicy("xds_override_host_experimental")) {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakeXdsOverrideHostConfig(
absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
@ -71,7 +72,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses) {
EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()),
lb_policy()),
policy_.get()),
absl::OkStatus());
return ExpectRoundRobinStartup(addresses);
}
@ -95,7 +96,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
update.addresses->push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus());
EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus());
}
CallAttributes MakeOverrideHostAttribute(absl::string_view host) {
@ -104,6 +105,8 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
std::make_unique<XdsOverrideHostAttribute>(host));
return override_host_attributes;
}
OrphanablePtr<LoadBalancingPolicy> policy_;
};
TEST_F(XdsOverrideHostTest, DelegatesToChild) {
@ -115,7 +118,7 @@ TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
EXPECT_EQ(
ApplyUpdate(
BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
lb_policy()),
policy_.get()),
absl::InvalidArgumentError("Missing policy config"));
}
@ -161,7 +164,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
// Some other address is gone
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[1]},
MakeXdsOverrideHostConfig()),
lb_policy()),
policy_.get()),
absl::OkStatus());
// Wait for LB policy to return a new picker that uses the updated
// addresses. We can't use the host override for this, because then
@ -175,7 +178,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
// "Our" address is gone so others get returned in round-robin order
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[2]},
MakeXdsOverrideHostConfig()),
lb_policy()),
policy_.get()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
// In this case, we can pass call_attributes while we wait instead of
@ -187,7 +190,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
// And now it is back
EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[1], kAddresses[2]},
MakeXdsOverrideHostConfig()),
lb_policy()),
policy_.get()),
absl::OkStatus());
// Wait for LB policy to return the new picker.
picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
@ -335,7 +338,6 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
// picks where the override host is CONNECTING. All picks without an
// override host should not use this host.
gpr_log(GPR_INFO, "### subchannel starts reconnecting");
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannel->ConnectionRequested());
ExpectQueueEmpty();
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
@ -467,5 +469,8 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -411,8 +411,7 @@ static void test_cooldown() {
TEST(DnsResolverCooldownTest, MainTest) {
grpc_init();
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
g_work_serializer = &work_serializer;
g_default_dns_lookup_ares = grpc_dns_lookup_hostname_ares;

@ -87,8 +87,7 @@ static void test_fails(grpc_core::ResolverFactory* factory,
}
TEST(DnsResolverTest, MainTest) {
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
g_work_serializer = &work_serializer;
grpc_core::ResolverFactory* dns = grpc_core::CoreConfiguration::Get()

@ -22,11 +22,9 @@
#include <string.h>
#include <algorithm>
#include <functional>
#include <initializer_list>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -42,8 +40,6 @@
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
@ -59,14 +55,12 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
void SetExpectedAndEvent(grpc_core::Resolver::Result expected,
gpr_event* ev) {
grpc_core::MutexLock lock(&mu_);
ASSERT_EQ(ev_, nullptr);
expected_ = std::move(expected);
ev_ = ev;
}
void ReportResult(grpc_core::Resolver::Result actual) override {
grpc_core::MutexLock lock(&mu_);
ASSERT_NE(ev_, nullptr);
// We only check the addresses, because that's the only thing
// explicitly set by the test via
@ -81,9 +75,8 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
}
private:
grpc_core::Mutex mu_;
grpc_core::Resolver::Result expected_ ABSL_GUARDED_BY(mu_);
gpr_event* ev_ ABSL_GUARDED_BY(mu_) = nullptr;
grpc_core::Resolver::Result expected_;
gpr_event* ev_ = nullptr;
};
static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
@ -131,18 +124,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() {
TEST(FakeResolverTest, FakeResolver) {
grpc_core::ExecCtx exec_ctx;
std::shared_ptr<grpc_core::WorkSerializer> work_serializer =
std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
auto synchronously = [work_serializer](std::function<void()> do_this_thing) {
grpc_core::Notification notification;
work_serializer->Run(
[do_this_thing = std::move(do_this_thing), &notification]() mutable {
do_this_thing();
notification.Notify();
},
DEBUG_LOCATION);
notification.WaitForNotification();
};
std::make_shared<grpc_core::WorkSerializer>();
// Create resolver.
ResultHandler* result_handler = new ResultHandler();
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
@ -152,7 +134,7 @@ TEST(FakeResolverTest, FakeResolver) {
work_serializer, response_generator.get(),
std::unique_ptr<grpc_core::Resolver::ResultHandler>(result_handler));
ASSERT_NE(resolver.get(), nullptr);
synchronously([resolver = resolver.get()] { resolver->StartLocked(); });
resolver->StartLocked();
// Test 1: normal resolution.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
@ -161,7 +143,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event ev1;
gpr_event_init(&ev1);
result_handler->SetExpectedAndEvent(result, &ev1);
response_generator->SetResponseSynchronously(std::move(result));
response_generator->SetResponse(std::move(result));
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 2: update resolution.
@ -172,7 +154,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event ev2;
gpr_event_init(&ev2);
result_handler->SetExpectedAndEvent(result, &ev2);
response_generator->SetResponseSynchronously(std::move(result));
response_generator->SetResponse(std::move(result));
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 3: normal re-resolution.
@ -186,11 +168,10 @@ TEST(FakeResolverTest, FakeResolver) {
result_handler->SetExpectedAndEvent(reresolution_result, &ev3);
// Set reresolution_results.
// No result will be returned until re-resolution is requested.
response_generator->SetReresolutionResponseSynchronously(reresolution_result);
response_generator->SetReresolutionResponse(reresolution_result);
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
synchronously(
[resolver = resolver.get()] { resolver->RequestReresolutionLocked(); });
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 4: repeat re-resolution.
@ -201,8 +182,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event_init(&ev4);
result_handler->SetExpectedAndEvent(std::move(reresolution_result), &ev4);
// Trigger a re-resolution.
synchronously(
[resolver = resolver.get()] { resolver->RequestReresolutionLocked(); });
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 5: normal resolution.
@ -213,7 +193,7 @@ TEST(FakeResolverTest, FakeResolver) {
gpr_event ev5;
gpr_event_init(&ev5);
result_handler->SetExpectedAndEvent(result, &ev5);
response_generator->SetResponseSynchronously(std::move(result));
response_generator->SetResponse(std::move(result));
grpc_core::ExecCtx::Get()->Flush();
ASSERT_NE(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)), nullptr);
// Test 6: no-op.

@ -28,7 +28,6 @@
#include <grpc/support/log.h>
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -89,8 +88,7 @@ static void test_fails(grpc_core::ResolverFactory* factory,
}
TEST(SockaddrResolverTest, MainTest) {
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
g_work_serializer = &work_serializer;
grpc_core::ResolverFactory* ipv4 = grpc_core::CoreConfiguration::Get()

@ -80,7 +80,6 @@ void run_test(bool wait_for_ready) {
grpc_core::CqVerifier::tag(1), nullptr));
{
response_generator->WaitForResolverSet();
grpc_core::ExecCtx exec_ctx;
response_generator->SetFailure();
}

@ -221,10 +221,9 @@ grpc_cc_library(
"absl/time",
],
deps = [
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_unsecure",
"//src/core:channel_args_endpoint_config",
"//src/core:event_engine_common",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:memory_quota",
"//src/core:notification",

@ -202,12 +202,6 @@ void FuzzingEventEngine::TickForDuration(grpc_core::Duration d) {
TickUntilTimestamp(grpc_core::Timestamp::Now() + d);
}
void FuzzingEventEngine::SetRunAfterDurationCallback(
absl::AnyInvocable<void(Duration)> callback) {
grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
run_after_duration_callback_ = std::move(callback);
}
FuzzingEventEngine::Time FuzzingEventEngine::Now() {
grpc_core::MutexLock lock(&*now_mu_);
return now_;
@ -440,10 +434,8 @@ EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
// TODO(ctiller): do something with the timeout
// Schedule a timer to run (with some fuzzer selected delay) the on_connect
// callback.
grpc_core::MutexLock lock(&*mu_);
auto task_handle = RunAfterLocked(
RunType::kRunAfter, Duration(0),
[this, addr, on_connect = std::move(on_connect)]() mutable {
auto task_handle = RunAfter(
Duration(0), [this, addr, on_connect = std::move(on_connect)]() mutable {
// Check for a legal address and extract the target port number.
auto port = ResolvedAddressGetPort(addr);
grpc_core::MutexLock lock(&*mu_);
@ -497,14 +489,11 @@ FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
}
void FuzzingEventEngine::Run(Closure* closure) {
grpc_core::MutexLock lock(&*mu_);
RunAfterLocked(RunType::kRunAfter, Duration::zero(),
[closure]() { closure->Run(); });
RunAfter(Duration::zero(), closure);
}
void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
grpc_core::MutexLock lock(&*mu_);
RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
RunAfter(Duration::zero(), std::move(closure));
}
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
@ -514,12 +503,6 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
Duration when, absl::AnyInvocable<void()> closure) {
{
grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
if (run_after_duration_callback_ != nullptr) {
run_after_duration_callback_(when);
}
}
grpc_core::MutexLock lock(&*mu_);
// (b/258949216): Cap it to one year to avoid integer overflow errors.
return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),

@ -79,10 +79,6 @@ class FuzzingEventEngine : public EventEngine {
// Tick for some grpc_core::Duration
void TickForDuration(grpc_core::Duration d) ABSL_LOCKS_EXCLUDED(mu_);
// Sets a callback to be invoked any time RunAfter() is called.
// Allows tests to verify the specified duration.
void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback);
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
@ -294,10 +290,6 @@ class FuzzingEventEngine : public EventEngine {
std::queue<std::queue<size_t>> write_sizes_for_future_connections_
ABSL_GUARDED_BY(mu_);
grpc_pick_port_functions previous_pick_port_functions_;
grpc_core::Mutex run_after_duration_callback_mu_;
absl::AnyInvocable<void(Duration)> run_after_duration_callback_
ABSL_GUARDED_BY(run_after_duration_callback_mu_);
};
} // namespace experimental

@ -143,12 +143,6 @@ class FuzzingResolverEventEngine
}
bool Cancel(TaskHandle /* handle */) override { return true; }
void Run(absl::AnyInvocable<void()> fn) override {
runner_.Run(std::move(fn));
}
void Run(Closure* fn) override { runner_.Run(fn); }
void Tick() { runner_.Tick(); }
private:
@ -265,7 +259,7 @@ DEFINE_PROTO_FUZZER(const event_engine_client_channel_resolver::Msg& msg) {
grpc_event_engine::experimental::GetDefaultEventEngine());
{
// scoped to ensure the resolver is orphaned when done resolving.
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>(engine);
auto work_serializer = std::make_shared<grpc_core::WorkSerializer>();
EventEngineClientChannelDNSResolverFactory resolver_factory;
auto resolver_args = ConstructResolverArgs(
grpc_core::testing::CreateChannelArgsFromFuzzingConfiguration(

@ -383,7 +383,6 @@ grpc_cc_test(
deps = [
"//:gpr",
"//:grpc",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/util:grpc_test_util",
],
)

@ -32,22 +32,15 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/util/test_config.h"
using grpc_event_engine::experimental::GetDefaultEventEngine;
using grpc_event_engine::experimental::WaitForSingleOwner;
namespace grpc_core {
namespace {
TEST(WorkSerializerTest, NoOp) { WorkSerializer lock(GetDefaultEventEngine()); }
TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; }
TEST(WorkSerializerTest, ExecuteOneRun) {
WorkSerializer lock(GetDefaultEventEngine());
grpc_core::WorkSerializer lock;
gpr_event done;
gpr_event_init(&done);
lock.Run([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
@ -57,7 +50,7 @@ TEST(WorkSerializerTest, ExecuteOneRun) {
}
TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
WorkSerializer lock(GetDefaultEventEngine());
grpc_core::WorkSerializer lock;
gpr_event done;
gpr_event_init(&done);
lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast<void*>(1)); },
@ -70,7 +63,7 @@ TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) {
class TestThread {
public:
explicit TestThread(WorkSerializer* lock)
explicit TestThread(grpc_core::WorkSerializer* lock)
: lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
@ -111,14 +104,14 @@ class TestThread {
DEBUG_LOCATION);
}
WorkSerializer* lock_ = nullptr;
Thread thread_;
grpc_core::WorkSerializer* lock_ = nullptr;
grpc_core::Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(WorkSerializerTest, ExecuteMany) {
WorkSerializer lock(GetDefaultEventEngine());
grpc_core::WorkSerializer lock;
{
std::vector<std::unique_ptr<TestThread>> threads;
for (size_t i = 0; i < 10; ++i) {
@ -129,7 +122,7 @@ TEST(WorkSerializerTest, ExecuteMany) {
class TestThreadScheduleAndDrain {
public:
explicit TestThreadScheduleAndDrain(WorkSerializer* lock)
explicit TestThreadScheduleAndDrain(grpc_core::WorkSerializer* lock)
: lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) {
gpr_event_init(&done_);
thread_.Start();
@ -172,14 +165,14 @@ class TestThreadScheduleAndDrain {
DEBUG_LOCATION);
}
WorkSerializer* lock_ = nullptr;
Thread thread_;
grpc_core::WorkSerializer* lock_ = nullptr;
grpc_core::Thread thread_;
size_t counter_ = 0;
gpr_event done_;
};
TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
WorkSerializer lock(GetDefaultEventEngine());
grpc_core::WorkSerializer lock;
{
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> threads;
for (size_t i = 0; i < 10; ++i) {
@ -189,7 +182,7 @@ TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) {
}
TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
WorkSerializer lock(GetDefaultEventEngine());
grpc_core::WorkSerializer lock;
{
std::vector<std::unique_ptr<TestThread>> run_threads;
std::vector<std::unique_ptr<TestThreadScheduleAndDrain>> schedule_threads;
@ -203,17 +196,16 @@ TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) {
// Tests that work serializers allow destruction from the last callback
TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) {
auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
auto lock = std::make_shared<grpc_core::WorkSerializer>();
lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION);
WaitForSingleOwner(GetDefaultEventEngine());
}
// Tests additional racy conditions when the last callback triggers work
// serializer destruction.
TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
for (int i = 0; i < 1000; ++i) {
auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
Notification notification;
auto lock = std::make_shared<grpc_core::WorkSerializer>();
grpc_core::Notification notification;
std::thread t1([&]() {
notification.WaitForNotification();
lock.reset();
@ -226,7 +218,7 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRace) {
// Tests racy conditions when the last callback triggers work
// serializer destruction.
TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
auto lock = std::make_shared<WorkSerializer>(GetDefaultEventEngine());
auto lock = std::make_shared<grpc_core::WorkSerializer>();
absl::Barrier barrier(11);
std::vector<std::thread> threads;
threads.reserve(10);
@ -245,53 +237,42 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) {
#ifndef NDEBUG
TEST(WorkSerializerTest, RunningInWorkSerializer) {
auto work_serializer1 =
std::make_shared<WorkSerializer>(GetDefaultEventEngine());
auto work_serializer2 =
std::make_shared<WorkSerializer>(GetDefaultEventEngine());
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
work_serializer1->Run(
[=]() {
EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
work_serializer2->Run(
[=]() {
EXPECT_EQ(work_serializer1->RunningInWorkSerializer(),
!IsWorkSerializerDispatchEnabled());
EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
grpc_core::WorkSerializer work_serializer1;
grpc_core::WorkSerializer work_serializer2;
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
work_serializer2->Run(
[=]() {
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2->RunningInWorkSerializer());
work_serializer1->Run(
[=]() {
EXPECT_TRUE(work_serializer1->RunningInWorkSerializer());
EXPECT_EQ(work_serializer2->RunningInWorkSerializer(),
!IsWorkSerializerDispatchEnabled());
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
work_serializer2.Run(
[&]() {
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
work_serializer1.Run(
[&]() {
EXPECT_TRUE(work_serializer1.RunningInWorkSerializer());
EXPECT_TRUE(work_serializer2.RunningInWorkSerializer());
},
DEBUG_LOCATION);
},
DEBUG_LOCATION);
EXPECT_FALSE(work_serializer1->RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2->RunningInWorkSerializer());
Notification done1;
Notification done2;
work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION);
work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION);
done1.WaitForNotification();
done2.WaitForNotification();
EXPECT_FALSE(work_serializer1.RunningInWorkSerializer());
EXPECT_FALSE(work_serializer2.RunningInWorkSerializer());
}
#endif
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);

@ -358,10 +358,9 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
{
grpc_core::ExecCtx exec_ctx;
fake_resolver_response_generator->SetResponseSynchronously(
BuildResolverResponse(
{absl::StrCat("ipv4:", kSharedUnconnectableAddress),
absl::StrCat("ipv4:", test_server->address())}));
fake_resolver_response_generator->SetResponse(BuildResolverResponse(
{absl::StrCat("ipv4:", kSharedUnconnectableAddress),
absl::StrCat("ipv4:", test_server->address())}));
}
args.push_back(grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
fake_resolver_response_generator.get()));

@ -492,9 +492,8 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
for (int i = 0; i < 3; i++) {
gpr_log(GPR_INFO, "Expected keepalive time : %d",
expected_keepalive_time_sec);
response_generator->SetResponseSynchronously(
BuildResolverResult({absl::StrCat(
"ipv4:", i % 2 == 0 ? server_address1 : server_address2)}));
response_generator->SetResponse(BuildResolverResult({absl::StrCat(
"ipv4:", i % 2 == 0 ? server_address1 : server_address2)}));
// ExecCtx::Flush() might not be enough to make sure that the resolver
// result has been propagated, so sleep for a bit.
grpc_core::ExecCtx::Get()->Flush();
@ -507,7 +506,7 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
GPR_INFO,
"Client keepalive time %d should now be in sync with the server settings",
expected_keepalive_time_sec);
response_generator->SetResponseSynchronously(
response_generator->SetResponse(
BuildResolverResult({absl::StrCat("ipv4:", server_address2)}));
grpc_core::ExecCtx::Get()->Flush();
gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
@ -555,7 +554,7 @@ TEST_F(KeepaliveThrottlingTest,
grpc_channel* channel =
grpc_channel_create("fake:///", creds, &client_channel_args);
grpc_channel_credentials_release(creds);
response_generator->SetResponseSynchronously(
response_generator->SetResponse(
BuildResolverResult({absl::StrCat("ipv4:", server_address1),
absl::StrCat("ipv4:", server_address2)}));
// For a single subchannel 3 GOAWAYs would be sufficient to increase the

@ -76,7 +76,7 @@ void TryConnectAndDestroy() {
<< lb_address_result.service_config.status();
lb_address_result.args = grpc_core::SetGrpcLbBalancerAddresses(
grpc_core::ChannelArgs(), addresses);
response_generator->SetResponseAsync(lb_address_result);
response_generator->SetResponse(lb_address_result);
grpc::ChannelArguments args;
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator.get());

@ -596,7 +596,6 @@ grpc_cc_test(
"gtest",
],
flaky = True, # TODO(b/150567713)
shard_count = 20,
tags = [
"cpp_end2end_test",
"cpp_lb_end2end_test",

@ -13,7 +13,6 @@
// limitations under the License.
#include <algorithm>
#include <chrono>
#include <deque>
#include <memory>
#include <mutex>
@ -220,13 +219,13 @@ class FakeResolverResponseGeneratorWrapper {
const grpc_core::ChannelArgs& per_address_args =
grpc_core::ChannelArgs()) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponseSynchronously(BuildFakeResults(
response_generator_->SetResponse(BuildFakeResults(
ipv6_only_, ports, service_config_json, per_address_args));
}
void SetNextResolutionUponError(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetReresolutionResponseSynchronously(
response_generator_->SetReresolutionResponse(
BuildFakeResults(ipv6_only_, ports));
}
@ -237,7 +236,7 @@ class FakeResolverResponseGeneratorWrapper {
void SetResponse(grpc_core::Resolver::Result result) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponseSynchronously(std::move(result));
response_generator_->SetResponse(std::move(result));
}
grpc_core::FakeResolverResponseGenerator* Get() const {
@ -636,14 +635,10 @@ TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
// Note that this call also returns IDLE, since the state change has
// not yet occurred; it just gets triggered by this call.
EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
// Now that the channel is trying to connect, we should get to state
// Now that the channel is trying to connect, we should be in state
// CONNECTING.
ASSERT_TRUE(
WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) {
if (state == GRPC_CHANNEL_IDLE) return false;
EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING);
return true;
}));
EXPECT_EQ(channel->GetState(false /* try_to_connect */),
GRPC_CHANNEL_CONNECTING);
// Return a resolver result, which allows the connection attempt to proceed.
response_generator.SetNextResolution(GetServersPorts());
// We should eventually transition into state READY.
@ -1305,7 +1300,6 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
grpc_core::Resolver::Result result;
result.addresses.emplace();
result.result_health_callback = [&](absl::Status status) {
gpr_log(GPR_INFO, "****** RESULT HEALTH CALLBACK *******");
EXPECT_EQ(absl::StatusCode::kUnavailable, status.code());
EXPECT_EQ("address list must not be empty", status.message()) << status;
notification.Notify();
@ -1318,8 +1312,8 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) {
};
EXPECT_TRUE(
WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true));
// Callback should run.
notification.WaitForNotification();
// Callback should have been run.
ASSERT_TRUE(notification.HasBeenNotified());
// Return a valid address.
gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******");
StartServers(1);
@ -1614,7 +1608,6 @@ TEST_F(RoundRobinTest, Updates) {
ports.clear();
ports.emplace_back(servers_[1]->port_);
response_generator.SetNextResolution(ports);
WaitForChannelReady(channel.get());
WaitForServer(DEBUG_LOCATION, stub, 1);
EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false));
// Check LB policy name for the channel.

@ -605,7 +605,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = MakeResolverResult(
balancer_address_data, backend_address_data, service_config_json);
response_generator_->SetResponseSynchronously(std::move(result));
response_generator_->SetResponse(std::move(result));
}
void SetNextReresolutionResponse(
@ -613,11 +613,9 @@ class GrpclbEnd2endTest : public ::testing::Test {
const std::vector<AddressData>& backend_address_data = {},
const char* service_config_json = kDefaultServiceConfig) {
grpc_core::ExecCtx exec_ctx;
response_generator_->WaitForResolverSet();
grpc_core::Resolver::Result result = MakeResolverResult(
balancer_address_data, backend_address_data, service_config_json);
response_generator_->SetReresolutionResponseSynchronously(
std::move(result));
response_generator_->SetReresolutionResponse(std::move(result));
}
std::vector<int> GetBackendPorts(size_t start_index = 0,

@ -135,8 +135,7 @@ class FakeResolverResponseGeneratorWrapper {
void SetNextResolution(absl::string_view service_config_json) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponseSynchronously(
BuildFakeResults(service_config_json));
response_generator_->SetResponse(BuildFakeResults(service_config_json));
}
grpc_core::FakeResolverResponseGenerator* Get() const {

@ -192,7 +192,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
void SetNextResolutionNoServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
response_generator_->SetResponseSynchronously(result);
response_generator_->SetResponse(result);
}
void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
@ -201,7 +201,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
result.service_config =
grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), "{}");
ASSERT_TRUE(result.service_config.ok()) << result.service_config.status();
response_generator_->SetResponseSynchronously(result);
response_generator_->SetResponse(result);
}
void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
@ -209,7 +209,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config =
absl::InvalidArgumentError("error parsing service config");
response_generator_->SetResponseSynchronously(result);
response_generator_->SetResponse(result);
}
void SetNextResolutionWithServiceConfig(const std::vector<int>& ports,
@ -218,7 +218,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test {
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config =
grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), svc_cfg);
response_generator_->SetResponseSynchronously(result);
response_generator_->SetResponse(result);
}
std::vector<int> GetServersPorts(size_t start_index = 0) {

@ -101,7 +101,7 @@ TEST_P(LogicalDNSClusterTest, Basic) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// RPCs should succeed.
@ -287,7 +287,7 @@ TEST_P(AggregateClusterTest, EdsToLogicalDns) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2));
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Wait for traffic to go to backend 0.
@ -348,7 +348,7 @@ TEST_P(AggregateClusterTest, LogicalDnsToEds) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts(0, 1));
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Wait for traffic to go to backend 0.
@ -419,7 +419,7 @@ TEST_P(AggregateClusterTest, ReconfigEdsWhileLogicalDnsChildFails) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = absl::UnavailableError("injected error");
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// When an RPC fails, we know the channel has seen the update.

@ -210,7 +210,7 @@ TEST_P(RingHashTest,
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Inject connection delay to make this act more realistically.
@ -278,7 +278,7 @@ TEST_P(RingHashTest,
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result;
result.addresses = CreateAddressListFromPortList(GetBackendPorts());
logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
logical_dns_cluster_resolver_response_generator_->SetResponse(
std::move(result));
}
// Set up connection attempt injector.

@ -107,8 +107,7 @@ void ArgsInit(ArgsStruct* args) {
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
args->lock = std::make_shared<grpc_core::WorkSerializer>();
gpr_atm_rel_store(&args->done_atm, 0);
args->channel_args = nullptr;
}

@ -201,8 +201,7 @@ void ArgsInit(ArgsStruct* args) {
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = std::make_shared<grpc_core::WorkSerializer>(
grpc_event_engine::experimental::GetDefaultEventEngine());
args->lock = std::make_shared<grpc_core::WorkSerializer>();
args->done = false;
args->channel_args = nullptr;
}

Loading…
Cancel
Save