diff --git a/BUILD b/BUILD index 31f9cf4c662..bde7f174e6a 100644 --- a/BUILD +++ b/BUILD @@ -2398,20 +2398,14 @@ grpc_cc_library( hdrs = [ "//src/core:lib/gprpp/work_serializer.h", ], - external_deps = [ - "absl/base:core_headers", - "absl/container:inlined_vector", - ], + external_deps = ["absl/base:core_headers"], language = "c++", visibility = ["@grpc:client_channel"], deps = [ "debug_location", - "event_engine_base_hdrs", - "exec_ctx", "gpr", "grpc_trace", "orphanable", - "//src/core:experiments", ], ) @@ -3072,7 +3066,6 @@ grpc_cc_library( "//src/core:env", "//src/core:error", "//src/core:gpr_atm", - "//src/core:gpr_manual_constructor", "//src/core:grpc_backend_metric_data", "//src/core:grpc_deadline_filter", "//src/core:grpc_service_config", @@ -3730,7 +3723,6 @@ grpc_cc_library( "work_serializer", "//src/core:channel_args", "//src/core:grpc_service_config", - "//src/core:notification", "//src/core:ref_counted", "//src/core:useful", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index ed16090d333..f78ef975c91 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8270,6 +8270,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) target_link_libraries(cf_event_engine_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc_test_util ) @@ -12218,6 +12219,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) target_link_libraries(fuzzing_event_engine_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure ${_gRPC_PROTOBUF_LIBRARIES} grpc_test_util ) @@ -13509,6 +13511,7 @@ target_include_directories(h2_ssl_cert_test target_link_libraries(h2_ssl_cert_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc_test_util ) @@ -16574,6 +16577,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) target_link_libraries(oracle_event_engine_posix_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc_test_util ) @@ -16759,13 +16763,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(outlier_detection_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h test/core/client_channel/lb_policy/outlier_detection_test.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc ) target_compile_features(outlier_detection_test PUBLIC cxx_std_14) target_include_directories(outlier_detection_test @@ -16790,7 +16788,6 @@ target_include_directories(outlier_detection_test target_link_libraries(outlier_detection_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - ${_gRPC_PROTOBUF_LIBRARIES} grpc_test_util ) @@ -17148,13 +17145,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(pick_first_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h test/core/client_channel/lb_policy/pick_first_test.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc ) target_compile_features(pick_first_test PUBLIC cxx_std_14) target_include_directories(pick_first_test @@ -17179,7 +17170,6 @@ target_include_directories(pick_first_test target_link_libraries(pick_first_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - ${_gRPC_PROTOBUF_LIBRARIES} grpc_test_util ) @@ -17594,6 +17584,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) target_link_libraries(posix_endpoint_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc_test_util ) @@ -17667,6 +17658,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) target_link_libraries(posix_event_engine_connect_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc_test_util ) @@ -17711,6 +17703,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) target_link_libraries(posix_event_engine_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc++_test_util ) @@ -20694,13 +20687,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(round_robin_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h test/core/client_channel/lb_policy/round_robin_test.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc ) target_compile_features(round_robin_test PUBLIC cxx_std_14) target_include_directories(round_robin_test @@ -20725,7 +20712,6 @@ target_include_directories(round_robin_test target_link_libraries(round_robin_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - ${_gRPC_PROTOBUF_LIBRARIES} grpc_test_util ) @@ -23294,7 +23280,6 @@ add_executable(test_core_channel_channelz_test ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/channelz/channelz.grpc.pb.h test/core/channel/channelz_test.cc - test/core/event_engine/event_engine_test_utils.cc test/cpp/util/channel_trace_proto_helper.cc ) target_compile_features(test_core_channel_channelz_test PUBLIC cxx_std_14) @@ -24580,6 +24565,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) target_link_libraries(thready_posix_event_engine_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + grpc_unsecure grpc_test_util ) @@ -25614,13 +25600,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(weighted_round_robin_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h test/core/client_channel/lb_policy/weighted_round_robin_test.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc ) target_compile_features(weighted_round_robin_test PUBLIC cxx_std_14) target_include_directories(weighted_round_robin_test @@ -25645,7 +25625,6 @@ target_include_directories(weighted_round_robin_test target_link_libraries(weighted_round_robin_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - ${_gRPC_PROTOBUF_LIBRARIES} grpc_test_util ) @@ -25958,7 +25937,6 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_executable(work_serializer_test - test/core/event_engine/event_engine_test_utils.cc test/core/gprpp/work_serializer_test.cc ) target_compile_features(work_serializer_test PUBLIC cxx_std_14) @@ -28658,13 +28636,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(xds_override_host_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h test/core/client_channel/lb_policy/xds_override_host_test.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc ) target_compile_features(xds_override_host_test PUBLIC cxx_std_14) target_include_directories(xds_override_host_test @@ -28689,7 +28661,6 @@ target_include_directories(xds_override_host_test target_link_libraries(xds_override_host_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - ${_gRPC_PROTOBUF_LIBRARIES} grpc_test_util ) diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 9f0e1fbd4dc..754f6078c2e 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -23,12 +23,11 @@ EXPERIMENTS = { "off": { "core_end2end_test": [ "event_engine_listener", + "promise_based_client_call", "promise_based_server_call", - "work_serializer_dispatch", ], "cpp_end2end_test": [ "promise_based_server_call", - "work_serializer_dispatch", ], "endpoint_test": [ "tcp_frame_size_tuning", @@ -42,8 +41,8 @@ EXPERIMENTS = { "tcp_frame_size_tuning", "tcp_rcv_lowat", ], - "lb_unit_test": [ - "work_serializer_dispatch", + "lame_client_test": [ + "promise_based_client_call", ], "logging_test": [ "promise_based_server_call", @@ -55,7 +54,6 @@ EXPERIMENTS = { ], "xds_end2end_test": [ "promise_based_server_call", - "work_serializer_dispatch", ], }, "on": { @@ -82,12 +80,11 @@ EXPERIMENTS = { "off": { "core_end2end_test": [ "event_engine_listener", + "promise_based_client_call", "promise_based_server_call", - "work_serializer_dispatch", ], "cpp_end2end_test": [ "promise_based_server_call", - "work_serializer_dispatch", ], "endpoint_test": [ "tcp_frame_size_tuning", @@ -101,8 +98,8 @@ EXPERIMENTS = { "tcp_frame_size_tuning", "tcp_rcv_lowat", ], - "lb_unit_test": [ - "work_serializer_dispatch", + "lame_client_test": [ + "promise_based_client_call", ], "logging_test": [ "promise_based_server_call", @@ -114,7 +111,6 @@ EXPERIMENTS = { ], "xds_end2end_test": [ "promise_based_server_call", - "work_serializer_dispatch", ], }, "on": { @@ -145,12 +141,11 @@ EXPERIMENTS = { "core_end2end_test": [ "event_engine_client", "event_engine_listener", + "promise_based_client_call", "promise_based_server_call", - "work_serializer_dispatch", ], "cpp_end2end_test": [ "promise_based_server_call", - "work_serializer_dispatch", ], "endpoint_test": [ "tcp_frame_size_tuning", @@ -167,8 +162,8 @@ EXPERIMENTS = { "tcp_frame_size_tuning", "tcp_rcv_lowat", ], - "lb_unit_test": [ - "work_serializer_dispatch", + "lame_client_test": [ + "promise_based_client_call", ], "logging_test": [ "promise_based_server_call", @@ -183,7 +178,6 @@ EXPERIMENTS = { ], "xds_end2end_test": [ "promise_based_server_call", - "work_serializer_dispatch", ], }, "on": { diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f045b77f7da..7636c17cc6d 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6493,6 +6493,7 @@ targets: - test/core/event_engine/test_suite/tests/timer_test.cc deps: - gtest + - grpc_unsecure - grpc_test_util platforms: - linux @@ -8846,6 +8847,7 @@ targets: - test/core/event_engine/test_suite/tests/timer_test.cc deps: - gtest + - grpc_unsecure - protobuf - grpc_test_util platforms: @@ -9497,6 +9499,7 @@ targets: - test/core/event_engine/event_engine_test_utils.cc deps: - gtest + - grpc_unsecure - grpc_test_util - name: h2_ssl_session_reuse_test gtest: true @@ -11279,6 +11282,7 @@ targets: - test/core/event_engine/test_suite/tests/server_test.cc deps: - gtest + - grpc_unsecure - grpc_test_util platforms: - linux @@ -11350,16 +11354,11 @@ targets: language: c++ headers: - test/core/client_channel/lb_policy/lb_policy_test_lib.h - - test/core/event_engine/event_engine_test_utils.h - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/event_engine/mock_event_engine.h src: - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/client_channel/lb_policy/outlier_detection_test.cc - - test/core/event_engine/event_engine_test_utils.cc - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc deps: - gtest - - protobuf - grpc_test_util uses_polling: false - name: overload_test @@ -11581,17 +11580,12 @@ targets: language: c++ headers: - test/core/client_channel/lb_policy/lb_policy_test_lib.h - - test/core/event_engine/event_engine_test_utils.h - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/event_engine/mock_event_engine.h - test/core/util/scoped_env_var.h src: - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/client_channel/lb_policy/pick_first_test.cc - - test/core/event_engine/event_engine_test_utils.cc - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc deps: - gtest - - protobuf - grpc_test_util uses_polling: false - name: pid_controller_test @@ -11856,6 +11850,7 @@ targets: - test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc deps: - gtest + - grpc_unsecure - grpc_test_util platforms: - linux @@ -11890,6 +11885,7 @@ targets: - test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc deps: - gtest + - grpc_unsecure - grpc_test_util platforms: - linux @@ -11921,6 +11917,7 @@ targets: - test/cpp/util/get_grpc_test_runfile_dir.cc deps: - gtest + - grpc_unsecure - grpc++_test_util platforms: - linux @@ -13922,16 +13919,11 @@ targets: language: c++ headers: - test/core/client_channel/lb_policy/lb_policy_test_lib.h - - test/core/event_engine/event_engine_test_utils.h - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/event_engine/mock_event_engine.h src: - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/client_channel/lb_policy/round_robin_test.cc - - test/core/event_engine/event_engine_test_utils.cc - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc deps: - gtest - - protobuf - grpc_test_util uses_polling: false - name: secure_auth_context_test @@ -15273,12 +15265,10 @@ targets: build: test language: c++ headers: - - test/core/event_engine/event_engine_test_utils.h - test/cpp/util/channel_trace_proto_helper.h src: - src/proto/grpc/channelz/channelz.proto - test/core/channel/channelz_test.cc - - test/core/event_engine/event_engine_test_utils.cc - test/cpp/util/channel_trace_proto_helper.cc deps: - gtest @@ -16341,6 +16331,7 @@ targets: - test/core/event_engine/test_suite/thready_posix_event_engine_test.cc deps: - gtest + - grpc_unsecure - grpc_test_util platforms: - linux @@ -16873,16 +16864,11 @@ targets: language: c++ headers: - test/core/client_channel/lb_policy/lb_policy_test_lib.h - - test/core/event_engine/event_engine_test_utils.h - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/event_engine/mock_event_engine.h src: - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/client_channel/lb_policy/weighted_round_robin_test.cc - - test/core/event_engine/event_engine_test_utils.cc - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc deps: - gtest - - protobuf - grpc_test_util uses_polling: false - name: win_socket_test @@ -17156,10 +17142,8 @@ targets: build: test run: false language: c++ - headers: - - test/core/event_engine/event_engine_test_utils.h + headers: [] src: - - test/core/event_engine/event_engine_test_utils.cc - test/core/gprpp/work_serializer_test.cc deps: - gtest @@ -18195,16 +18179,11 @@ targets: language: c++ headers: - test/core/client_channel/lb_policy/lb_policy_test_lib.h - - test/core/event_engine/event_engine_test_utils.h - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/event_engine/mock_event_engine.h src: - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/client_channel/lb_policy/xds_override_host_test.cc - - test/core/event_engine/event_engine_test_utils.cc - - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc deps: - gtest - - protobuf - grpc_test_util uses_polling: false - name: xds_pick_first_end2end_test diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 289abd5943f..6cfd4c48fcd 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -70,9 +70,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/unique_type_name.h" @@ -342,7 +340,11 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { chand_, this, Activity::current()->DebugTag().c_str(), result.has_value() ? result->ToString().c_str() : "Pending"); } - if (!result.has_value()) return Pending{}; + if (!result.has_value()) { + waker_ = Activity::current()->MakeNonOwningWaker(); + was_queued_ = true; + return Pending{}; + } if (!result->ok()) return *result; call_args.client_initial_metadata = std::move(client_initial_metadata_); return std::move(call_args); @@ -360,17 +362,10 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { return GetContext(); } - void OnAddToQueueLocked() override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) { - waker_ = Activity::current()->MakeNonOwningWaker(); - was_queued_ = true; - } - - void RetryCheckResolutionLocked() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) override { + void RetryCheckResolutionLocked() override { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { - gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked(): %s", - chand_, this, waker_.ActivityDebugTag().c_str()); + gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked()", + chand_, this); } waker_.WakeupAsync(); } @@ -387,7 +382,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { grpc_polling_entity pollent_; ClientMetadataHandle client_initial_metadata_; bool was_queued_ = false; - Waker waker_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_); + Waker waker_; }; // @@ -1183,8 +1178,7 @@ ClientChannel::ClientChannel(grpc_channel_element_args* args, interested_parties_(grpc_pollset_set_create()), service_config_parser_index_( internal::ClientChannelServiceConfigParser::ParserIndex()), - work_serializer_( - std::make_shared(*args->channel_stack->event_engine)), + work_serializer_(std::make_shared()), state_tracker_("client_channel", GRPC_CHANNEL_IDLE), subchannel_pool_(GetSubchannelPool(channel_args_)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) { @@ -1744,10 +1738,6 @@ void ClientChannel::DestroyResolverAndLbPolicyLocked() { void ClientChannel::UpdateStateLocked(grpc_connectivity_state state, const absl::Status& status, const char* reason) { - if (state != GRPC_CHANNEL_SHUTDOWN && - state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) { - Crash("Illegal transition SHUTDOWN -> anything"); - } state_tracker_.SetState(state, status, reason); if (channelz_node_ != nullptr) { channelz_node_->SetConnectivityState(state); @@ -1944,12 +1934,10 @@ void ClientChannel::GetChannelInfo(grpc_channel_element* elem, } void ClientChannel::TryToConnectLocked() { - if (disconnect_error_.ok()) { - if (lb_policy_ != nullptr) { - lb_policy_->ExitIdleLocked(); - } else if (resolver_ == nullptr) { - CreateResolverLocked(); - } + if (lb_policy_ != nullptr) { + lb_policy_->ExitIdleLocked(); + } else if (resolver_ == nullptr) { + CreateResolverLocked(); } GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect"); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index b9db3eeade3..2d079ed2b7d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1588,7 +1588,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked() { // Pass channel creds via channel args, since the fake resolver won't // do this automatically. result.args = lb_channel_args.SetObject(std::move(channel_credentials)); - response_generator_->SetResponseAsync(std::move(result)); + response_generator_->SetResponse(std::move(result)); // Return status. return status; } diff --git a/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h b/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h index 160cadd86fc..eee94904ebb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h +++ b/src/core/ext/filters/client_channel/lb_policy/health_check_client_internal.h @@ -125,8 +125,7 @@ class HealthProducer : public Subchannel::DataProducerInterface { WeakRefCountedPtr producer_; absl::string_view health_check_service_name_; std::shared_ptr work_serializer_ = - std::make_shared( - producer_->subchannel_->event_engine()); + std::make_shared(); absl::optional state_ ABSL_GUARDED_BY(&HealthProducer::mu_); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index 5e97f25f57a..9500f5e9392 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -214,31 +214,25 @@ FakeResolverResponseGenerator::FakeResolverResponseGenerator() {} FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {} -void FakeResolverResponseGenerator::SetResponseAndNotify( - Resolver::Result result, Notification* notify_when_set) { +void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { RefCountedPtr resolver; { MutexLock lock(&mu_); if (resolver_ == nullptr) { has_result_ = true; result_ = std::move(result); - if (notify_when_set != nullptr) notify_when_set->Notify(); return; } resolver = resolver_->Ref(); } FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(resolver, std::move(result)); - resolver->work_serializer_->Run( - [arg, notify_when_set]() { - arg->SetResponseLocked(); - if (notify_when_set != nullptr) notify_when_set->Notify(); - }, - DEBUG_LOCATION); + resolver->work_serializer_->Run([arg]() { arg->SetResponseLocked(); }, + DEBUG_LOCATION); } -void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify( - Resolver::Result result, Notification* notify_when_set) { +void FakeResolverResponseGenerator::SetReresolutionResponse( + Resolver::Result result) { RefCountedPtr resolver; { MutexLock lock(&mu_); @@ -248,11 +242,7 @@ void FakeResolverResponseGenerator::SetReresolutionResponseAndNotify( FakeResolverResponseSetter* arg = new FakeResolverResponseSetter( resolver, std::move(result), true /* has_result */); resolver->work_serializer_->Run( - [arg, notify_when_set]() { - arg->SetReresolutionResponseLocked(); - if (notify_when_set != nullptr) notify_when_set->Notify(); - }, - DEBUG_LOCATION); + [arg]() { arg->SetReresolutionResponseLocked(); }, DEBUG_LOCATION); } void FakeResolverResponseGenerator::UnsetReresolutionResponse() { @@ -299,7 +289,6 @@ void FakeResolverResponseGenerator::SetFakeResolver( RefCountedPtr resolver) { MutexLock lock(&mu_); resolver_ = std::move(resolver); - cv_.SignalAll(); if (resolver_ == nullptr) return; if (has_result_) { FakeResolverResponseSetter* arg = @@ -310,13 +299,6 @@ void FakeResolverResponseGenerator::SetFakeResolver( } } -void FakeResolverResponseGenerator::WaitForResolverSet() { - MutexLock lock(&mu_); - while (resolver_ == nullptr) { - cv_.Wait(&mu_); - } -} - namespace { void* ResponseGeneratorChannelArgCopy(void* p) { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index b0463eee9d6..bedcf8de2b4 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -19,15 +19,12 @@ #include -#include - #include "absl/base/thread_annotations.h" #include "absl/strings/string_view.h" #include #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -60,39 +57,13 @@ class FakeResolverResponseGenerator // instance to trigger a new resolution with the specified result. If the // resolver is not available yet, delays response setting until it is. This // can be called at most once before the resolver is available. - // notify_when_set is an optional notification to signal when the response has - // been set. - void SetResponseAndNotify(Resolver::Result result, - Notification* notify_when_set); - - // Same as SetResponseAndNotify(), assume that async setting is fine - void SetResponseAsync(Resolver::Result result) { - SetResponseAndNotify(std::move(result), nullptr); - } - - // Same as SetResponseAndNotify(), but create and wait for the notification - void SetResponseSynchronously(Resolver::Result result) { - Notification n; - SetResponseAndNotify(std::move(result), &n); - n.WaitForNotification(); - } + void SetResponse(Resolver::Result result); // Sets the re-resolution response, which is returned by the fake resolver // when re-resolution is requested (via \a RequestReresolutionLocked()). // The new re-resolution response replaces any previous re-resolution // response that may have been set by a previous call. - // notify_when_set is an optional notification to signal when the response has - // been set. - void SetReresolutionResponseAndNotify(Resolver::Result result, - Notification* notify_when_set); - void SetReresolutionResponseAsync(Resolver::Result result) { - SetReresolutionResponseAndNotify(std::move(result), nullptr); - } - void SetReresolutionResponseSynchronously(Resolver::Result result) { - Notification n; - SetReresolutionResponseAndNotify(std::move(result), &n); - n.WaitForNotification(); - } + void SetReresolutionResponse(Resolver::Result result); // Unsets the re-resolution response. After this, the fake resolver will // not return anything when \a RequestReresolutionLocked() is called. @@ -117,10 +88,6 @@ class FakeResolverResponseGenerator return GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR; } - // Wait for a resolver to be set (setting may be happening asynchronously, so - // this may block - consider it test only). - void WaitForResolverSet(); - static int ChannelArgsCompare(const FakeResolverResponseGenerator* a, const FakeResolverResponseGenerator* b) { return QsortCompare(a, b); @@ -133,7 +100,6 @@ class FakeResolverResponseGenerator // Mutex protecting the members below. Mutex mu_; - CondVar cv_; RefCountedPtr resolver_ ABSL_GUARDED_BY(mu_); Resolver::Result result_ ABSL_GUARDED_BY(mu_); bool has_result_ ABSL_GUARDED_BY(mu_) = false; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 2799ba8c25e..0a090637b2d 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -463,7 +463,6 @@ Subchannel::Subchannel(SubchannelKey key, pollset_set_(grpc_pollset_set_create()), connector_(std::move(connector)), watcher_list_(this), - work_serializer_(args_.GetObjectRef()), backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), event_engine_(args_.GetObjectRef()) { // A grpc_init is added here to ensure that grpc_shutdown does not happen diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index fe629fc3716..79258512647 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -272,10 +272,6 @@ class Subchannel : public DualRefCounted { void RemoveDataProducer(DataProducerInterface* data_producer) ABSL_LOCKS_EXCLUDED(mu_); - std::shared_ptr event_engine() { - return event_engine_; - } - private: // A linked list of ConnectivityStateWatcherInterfaces that are monitoring // the subchannel's state. diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index a77707a89f3..06ff09d7f75 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -1491,7 +1491,6 @@ XdsClient::XdsClient( xds_federation_enabled_(XdsFederationEnabled()), api_(this, &grpc_xds_client_trace, bootstrap_->node(), &symtab_, std::move(user_agent_name), std::move(user_agent_version)), - work_serializer_(engine), engine_(std::move(engine)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this); diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h index 9b02ef1337a..9a00f58f396 100644 --- a/src/core/lib/channel/channelz_registry.h +++ b/src/core/lib/channel/channelz_registry.h @@ -25,8 +25,6 @@ #include #include -#include "absl/base/thread_annotations.h" - #include "src/core/lib/channel/channelz.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -92,8 +90,8 @@ class ChannelzRegistry { // protects members Mutex mu_; - std::map node_map_ ABSL_GUARDED_BY(mu_); - intptr_t uuid_generator_ ABSL_GUARDED_BY(mu_) = 0; + std::map node_map_; + intptr_t uuid_generator_ = 0; }; } // namespace channelz diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index 1943c8e841a..b8b1d94738e 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -23,7 +23,6 @@ #include #include -#include #include #include @@ -87,15 +86,10 @@ namespace grpc_event_engine { namespace experimental { namespace { -// TODO(ctiller): grpc_core::Timestamp, Duration have very specific contracts -// around time caching and around when the underlying gpr_now call can be -// substituted out. -// We should probably move all usage here to std::chrono to avoid weird bugs in -// the future. - // Maximum amount of time an extra thread is allowed to idle before being // reclaimed. -constexpr auto kIdleThreadLimit = std::chrono::seconds(20); +constexpr grpc_core::Duration kIdleThreadLimit = + grpc_core::Duration::Seconds(20); // Rate at which "Waiting for ..." logs should be printed while quiescing. constexpr size_t kBlockingQuiesceLogRateSeconds = 3; // Minumum time between thread creations. @@ -430,7 +424,7 @@ bool WorkStealingThreadPool::ThreadState::Step() { // * the global queue is empty // * the steal pool returns nullptr bool should_run_again = false; - auto start_time = std::chrono::steady_clock::now(); + grpc_core::Timestamp start_time{grpc_core::Timestamp::Now()}; // Wait until work is available or until shut down. while (!pool_->IsForking()) { // Pull from the global queue next @@ -458,7 +452,7 @@ bool WorkStealingThreadPool::ThreadState::Step() { // has been idle long enough. if (timed_out && pool_->living_thread_count()->count() > pool_->reserve_threads() && - std::chrono::steady_clock::now() - start_time > kIdleThreadLimit) { + grpc_core::Timestamp::Now() - start_time > kIdleThreadLimit) { return false; } } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 41972cdd050..61d87841c08 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -96,11 +96,6 @@ const char* const description_keepalive_server_fix = "Allows overriding keepalive_permit_without_calls for servers. Refer " "https://github.com/grpc/grpc/pull/33917 for more information."; const char* const additional_constraints_keepalive_server_fix = "{}"; -const char* const description_work_serializer_dispatch = - "Have the work serializer dispatch work to event engine for every " - "callback, instead of running things inline in the first thread that " - "successfully enqueues work."; -const char* const additional_constraints_work_serializer_dispatch = "{}"; const char* const description_lazier_stream_updates = "Allow streams to consume up to 50% of the incoming window before we force " "send a flow control update."; @@ -164,8 +159,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_keepalive_fix, false, false}, {"keepalive_server_fix", description_keepalive_server_fix, additional_constraints_keepalive_server_fix, false, false}, - {"work_serializer_dispatch", description_work_serializer_dispatch, - additional_constraints_work_serializer_dispatch, false, true}, {"lazier_stream_updates", description_lazier_stream_updates, additional_constraints_lazier_stream_updates, true, true}, {"jitter_max_idle", description_jitter_max_idle, @@ -253,11 +246,6 @@ const char* const description_keepalive_server_fix = "Allows overriding keepalive_permit_without_calls for servers. Refer " "https://github.com/grpc/grpc/pull/33917 for more information."; const char* const additional_constraints_keepalive_server_fix = "{}"; -const char* const description_work_serializer_dispatch = - "Have the work serializer dispatch work to event engine for every " - "callback, instead of running things inline in the first thread that " - "successfully enqueues work."; -const char* const additional_constraints_work_serializer_dispatch = "{}"; const char* const description_lazier_stream_updates = "Allow streams to consume up to 50% of the incoming window before we force " "send a flow control update."; @@ -321,8 +309,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_keepalive_fix, false, false}, {"keepalive_server_fix", description_keepalive_server_fix, additional_constraints_keepalive_server_fix, false, false}, - {"work_serializer_dispatch", description_work_serializer_dispatch, - additional_constraints_work_serializer_dispatch, false, true}, {"lazier_stream_updates", description_lazier_stream_updates, additional_constraints_lazier_stream_updates, true, true}, {"jitter_max_idle", description_jitter_max_idle, @@ -410,11 +396,6 @@ const char* const description_keepalive_server_fix = "Allows overriding keepalive_permit_without_calls for servers. Refer " "https://github.com/grpc/grpc/pull/33917 for more information."; const char* const additional_constraints_keepalive_server_fix = "{}"; -const char* const description_work_serializer_dispatch = - "Have the work serializer dispatch work to event engine for every " - "callback, instead of running things inline in the first thread that " - "successfully enqueues work."; -const char* const additional_constraints_work_serializer_dispatch = "{}"; const char* const description_lazier_stream_updates = "Allow streams to consume up to 50% of the incoming window before we force " "send a flow control update."; @@ -478,8 +459,6 @@ const ExperimentMetadata g_experiment_metadata[] = { additional_constraints_keepalive_fix, false, false}, {"keepalive_server_fix", description_keepalive_server_fix, additional_constraints_keepalive_server_fix, false, false}, - {"work_serializer_dispatch", description_work_serializer_dispatch, - additional_constraints_work_serializer_dispatch, false, true}, {"lazier_stream_updates", description_lazier_stream_updates, additional_constraints_lazier_stream_updates, true, true}, {"jitter_max_idle", description_jitter_max_idle, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index e940fef4e03..f241376d9ec 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -83,7 +83,6 @@ inline bool IsServerPrivacyEnabled() { return false; } inline bool IsUniqueMetadataStringsEnabled() { return true; } inline bool IsKeepaliveFixEnabled() { return false; } inline bool IsKeepaliveServerFixEnabled() { return false; } -inline bool IsWorkSerializerDispatchEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES inline bool IsLazierStreamUpdatesEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE @@ -116,7 +115,6 @@ inline bool IsServerPrivacyEnabled() { return false; } inline bool IsUniqueMetadataStringsEnabled() { return true; } inline bool IsKeepaliveFixEnabled() { return false; } inline bool IsKeepaliveServerFixEnabled() { return false; } -inline bool IsWorkSerializerDispatchEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES inline bool IsLazierStreamUpdatesEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE @@ -149,7 +147,6 @@ inline bool IsServerPrivacyEnabled() { return false; } inline bool IsUniqueMetadataStringsEnabled() { return true; } inline bool IsKeepaliveFixEnabled() { return false; } inline bool IsKeepaliveServerFixEnabled() { return false; } -inline bool IsWorkSerializerDispatchEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES inline bool IsLazierStreamUpdatesEnabled() { return true; } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE @@ -207,20 +204,16 @@ inline bool IsUniqueMetadataStringsEnabled() { return IsExperimentEnabled(18); } inline bool IsKeepaliveFixEnabled() { return IsExperimentEnabled(19); } #define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_SERVER_FIX inline bool IsKeepaliveServerFixEnabled() { return IsExperimentEnabled(20); } -#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_DISPATCH -inline bool IsWorkSerializerDispatchEnabled() { - return IsExperimentEnabled(21); -} #define GRPC_EXPERIMENT_IS_INCLUDED_LAZIER_STREAM_UPDATES -inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(22); } +inline bool IsLazierStreamUpdatesEnabled() { return IsExperimentEnabled(21); } #define GRPC_EXPERIMENT_IS_INCLUDED_JITTER_MAX_IDLE -inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(23); } +inline bool IsJitterMaxIdleEnabled() { return IsExperimentEnabled(22); } #define GRPC_EXPERIMENT_IS_INCLUDED_ROUND_ROBIN_DELEGATE_TO_PICK_FIRST inline bool IsRoundRobinDelegateToPickFirstEnabled() { - return IsExperimentEnabled(24); + return IsExperimentEnabled(23); } -constexpr const size_t kNumExperiments = 25; +constexpr const size_t kNumExperiments = 24; extern const ExperimentMetadata g_experiment_metadata[kNumExperiments]; #endif diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 766bbfa96f5..7c605d53ff9 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -81,9 +81,7 @@ (ie when all filters in a stack are promise based) expiry: 2023/11/01 owner: ctiller@google.com - # TODO(ctiller): re-enable once we've got some more CI bandwidth - # test_tags: ["core_end2end_test", "lame_client_test"] - test_tags: [] + test_tags: ["core_end2end_test", "lame_client_test"] - name: free_large_allocator description: If set, return all free bytes from a "big" allocator expiry: 2023/11/01 @@ -171,14 +169,6 @@ owner: yashkt@google.com test_tags: [] allow_in_fuzzing_config: false -- name: work_serializer_dispatch - description: - Have the work serializer dispatch work to event engine for every callback, - instead of running things inline in the first thread that successfully - enqueues work. - expiry: 2024/02/10 - owner: ctiller@google.com - test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test", "lb_unit_test"] - name: lazier_stream_updates description: Allow streams to consume up to 50% of the incoming window before we diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml index b74ae5ee41d..f1770b22fae 100644 --- a/src/core/lib/experiments/rollouts.yaml +++ b/src/core/lib/experiments/rollouts.yaml @@ -78,8 +78,6 @@ windows: broken - name: work_stealing default: true -- name: work_serializer_dispatch - default: false - name: client_privacy default: false - name: canary_client_privacy diff --git a/src/core/lib/gprpp/work_serializer.cc b/src/core/lib/gprpp/work_serializer.cc index 1a2b9170f2a..fdade0ebba7 100644 --- a/src/core/lib/gprpp/work_serializer.cc +++ b/src/core/lib/gprpp/work_serializer.cc @@ -20,25 +20,18 @@ #include -#include #include #include #include #include #include -#include "absl/container/inlined_vector.h" - -#include #include #include "src/core/lib/debug/trace.h" -#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { @@ -50,32 +43,13 @@ DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer"); class WorkSerializer::WorkSerializerImpl : public Orphanable { public: - virtual void Run(std::function callback, - const DebugLocation& location) = 0; - virtual void Schedule(std::function callback, - const DebugLocation& location) = 0; - virtual void DrainQueue() = 0; - -#ifndef NDEBUG - virtual bool RunningInWorkSerializer() const = 0; -#endif -}; - -// -// WorkSerializer::LegacyWorkSerializer -// - -class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl { - public: - void Run(std::function callback, - const DebugLocation& location) override; - void Schedule(std::function callback, - const DebugLocation& location) override; - void DrainQueue() override; + void Run(std::function callback, const DebugLocation& location); + void Schedule(std::function callback, const DebugLocation& location); + void DrainQueue(); void Orphan() override; #ifndef NDEBUG - bool RunningInWorkSerializer() const override { + bool RunningInWorkSerializer() const { return std::this_thread::get_id() == current_thread_; } #endif @@ -115,14 +89,6 @@ class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl { return static_cast(ref_pair & 0xffffffffffffu); } -#ifndef NDEBUG - void SetCurrentThread() { current_thread_ = std::this_thread::get_id(); } - void ClearCurrentThread() { current_thread_ = std::thread::id(); } -#else - void SetCurrentThread() {} - void ClearCurrentThread() {} -#endif - // An initial size of 1 keeps track of whether the work serializer has been // orphaned. std::atomic refs_{MakeRefPair(0, 1)}; @@ -132,8 +98,8 @@ class WorkSerializer::LegacyWorkSerializer final : public WorkSerializerImpl { #endif }; -void WorkSerializer::LegacyWorkSerializer::Run(std::function callback, - const DebugLocation& location) { +void WorkSerializer::WorkSerializerImpl::Run(std::function callback, + const DebugLocation& location) { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]", this, location.file(), location.line()); @@ -146,7 +112,9 @@ void WorkSerializer::LegacyWorkSerializer::Run(std::function callback, GPR_DEBUG_ASSERT(GetSize(prev_ref_pair) > 0); if (GetOwners(prev_ref_pair) == 0) { // We took ownership of the WorkSerializer. Invoke callback and drain queue. - SetCurrentThread(); +#ifndef NDEBUG + current_thread_ = std::this_thread::get_id(); +#endif if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, " Executing immediately"); } @@ -169,7 +137,7 @@ void WorkSerializer::LegacyWorkSerializer::Run(std::function callback, } } -void WorkSerializer::LegacyWorkSerializer::Schedule( +void WorkSerializer::WorkSerializerImpl::Schedule( std::function callback, const DebugLocation& location) { CallbackWrapper* cb_wrapper = new CallbackWrapper(std::move(callback), location); @@ -182,7 +150,7 @@ void WorkSerializer::LegacyWorkSerializer::Schedule( queue_.Push(&cb_wrapper->mpscq_node); } -void WorkSerializer::LegacyWorkSerializer::Orphan() { +void WorkSerializer::WorkSerializerImpl::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this); } @@ -198,7 +166,7 @@ void WorkSerializer::LegacyWorkSerializer::Orphan() { // The thread that calls this loans itself to the work serializer so as to // execute all the scheduled callbacks. -void WorkSerializer::LegacyWorkSerializer::DrainQueue() { +void WorkSerializer::WorkSerializerImpl::DrainQueue() { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this); } @@ -207,7 +175,9 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueue() { const uint64_t prev_ref_pair = refs_.fetch_add(MakeRefPair(1, 1), std::memory_order_acq_rel); if (GetOwners(prev_ref_pair) == 0) { - SetCurrentThread(); +#ifndef NDEBUG + current_thread_ = std::this_thread::get_id(); +#endif // We took ownership of the WorkSerializer. Drain the queue. DrainQueueOwned(); } else { @@ -219,7 +189,7 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueue() { } } -void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() { +void WorkSerializer::WorkSerializerImpl::DrainQueueOwned() { if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { gpr_log(GPR_INFO, "WorkSerializer::DrainQueueOwned() %p", this); } @@ -236,10 +206,12 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() { } if (GetSize(prev_ref_pair) == 2) { // Queue drained. Give up ownership but only if queue remains empty. +#ifndef NDEBUG // Reset current_thread_ before giving up ownership to avoid TSAN // race. If we don't wind up giving up ownership, we'll set this // again below before we pull the next callback out of the queue. - ClearCurrentThread(); + current_thread_ = std::thread::id(); +#endif uint64_t expected = MakeRefPair(1, 1); if (refs_.compare_exchange_strong(expected, MakeRefPair(0, 1), std::memory_order_acq_rel)) { @@ -254,8 +226,10 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() { delete this; return; } +#ifndef NDEBUG // Didn't wind up giving up ownership, so set current_thread_ again. - SetCurrentThread(); + current_thread_ = std::this_thread::get_id(); +#endif } // There is at least one callback on the queue. Pop the callback from the // queue and execute it. @@ -279,231 +253,14 @@ void WorkSerializer::LegacyWorkSerializer::DrainQueueOwned() { } } -// -// WorkSerializer::DispatchingWorkSerializer -// - -// DispatchingWorkSerializer: executes callbacks one at a time on EventEngine. -// One at a time guarantees that fixed size thread pools in EventEngine -// implementations are not starved of threads by long running work serializers. -// We implement EventEngine::Closure directly to avoid allocating once per -// callback in the queue when scheduling. -class WorkSerializer::DispatchingWorkSerializer final - : public WorkSerializerImpl, - public grpc_event_engine::experimental::EventEngine::Closure { - public: - explicit DispatchingWorkSerializer( - std::shared_ptr - event_engine) - : event_engine_(std::move(event_engine)) {} - void Run(std::function callback, - const DebugLocation& location) override; - void Schedule(std::function callback, - const DebugLocation& location) override { - // We always dispatch to event engine, so Schedule and Run share semantics. - Run(callback, location); - } - void DrainQueue() override {} - void Orphan() override; - - // Override EventEngine::Closure - void Run() override; - -#ifndef NDEBUG - bool RunningInWorkSerializer() const override { - return running_work_serializer_ == this; - } -#endif - - private: - // Wrapper to capture DebugLocation for the callback. - struct CallbackWrapper { - CallbackWrapper(std::function cb, const DebugLocation& loc) - : callback(std::move(cb)), location(loc) {} - std::function callback; - // GPR_NO_UNIQUE_ADDRESS means this is 0 sized in release builds. - GPR_NO_UNIQUE_ADDRESS DebugLocation location; - }; - using CallbackVector = absl::InlinedVector; - - // Refill processing_ from incoming_ - // If processing_ is empty, also update running_ and return false. - // If additionally orphaned, will also delete this (therefore, it's not safe - // to touch any member variables if Refill returns false). - bool Refill(); - - // Perform the parts of Refill that need to acquire mu_ - // Returns a tri-state indicating whether we were refilled successfully (=> - // keep running), or finished, and then if we were orphaned. - enum class RefillResult { kRefilled, kFinished, kFinishedAndOrphaned }; - RefillResult RefillInner(); - -#ifndef NDEBUG - void SetCurrentThread() { running_work_serializer_ = this; } - void ClearCurrentThread() { running_work_serializer_ = nullptr; } -#else - void SetCurrentThread() {} - void ClearCurrentThread() {} -#endif - - // Member variables are roughly sorted to keep processing cache lines - // separated from incoming cache lines. - - // Callbacks that are currently being processed. - // Only accessed by: a Run() call going from not-running to running, or a work - // item being executed in EventEngine -- ie this does not need a mutex because - // all access is serialized. - // Stored in reverse execution order so that callbacks can be `pop_back()`'d - // on completion to free up any resources they hold. - CallbackVector processing_; - // EventEngine instance upon which we'll do our work. - const std::shared_ptr - event_engine_; - // Flags containing run state: - // - running_ goes from false->true whenever the first callback is scheduled - // on an idle WorkSerializer, and transitions back to false after the last - // callback scheduled is completed and the WorkSerializer is again idle. - // - orphaned_ transitions to true once upon Orphan being called. - // When orphaned_ is true and running_ is false, the DispatchingWorkSerializer - // instance is deleted. - bool running_ ABSL_GUARDED_BY(mu_) = false; - bool orphaned_ ABSL_GUARDED_BY(mu_) = false; - Mutex mu_; - // Queued callbacks. New work items land here, and when processing_ is drained - // we move this entire queue into processing_ and work on draining it again. - // In low traffic scenarios this gives two mutex acquisitions per work item, - // but as load increases we get some natural batching and the rate of mutex - // acquisitions per work item tends towards 1. - CallbackVector incoming_ ABSL_GUARDED_BY(mu_); - -#ifndef NDEBUG - static thread_local DispatchingWorkSerializer* running_work_serializer_; -#endif -}; - -#ifndef NDEBUG -thread_local WorkSerializer::DispatchingWorkSerializer* - WorkSerializer::DispatchingWorkSerializer::running_work_serializer_ = - nullptr; -#endif - -void WorkSerializer::DispatchingWorkSerializer::Orphan() { - ReleasableMutexLock lock(&mu_); - // If we're not running, then we can delete immediately. - if (!running_) { - lock.Release(); - delete this; - return; - } - // Otherwise store a flag to delete when we're done. - orphaned_ = true; -} - -// Implementation of WorkSerializerImpl::Run -void WorkSerializer::DispatchingWorkSerializer::Run( - std::function callback, const DebugLocation& location) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { - gpr_log(GPR_INFO, "WorkSerializer[%p] Scheduling callback [%s:%d]", this, - location.file(), location.line()); - } - MutexLock lock(&mu_); - if (!running_) { - // If we were previously idle, insert this callback directly into the empty - // processing_ list and start running. - running_ = true; - GPR_ASSERT(processing_.empty()); - processing_.emplace_back(std::move(callback), location); - event_engine_->Run(this); - } else { - // We are already running, so add this callback to the incoming_ list. - // The work loop will eventually get to it. - incoming_.emplace_back(std::move(callback), location); - } -} - -// Implementation of EventEngine::Closure::Run - our actual work loop -void WorkSerializer::DispatchingWorkSerializer::Run() { - // TODO(ctiller): remove these when we can deprecate ExecCtx - ApplicationCallbackExecCtx app_exec_ctx; - ExecCtx exec_ctx; - // Grab the last element of processing_ - which is the next item in our queue - // since processing_ is stored in reverse order. - auto& cb = processing_.back(); - if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { - gpr_log(GPR_INFO, "WorkSerializer[%p] Executing callback [%s:%d]", this, - cb.location.file(), cb.location.line()); - } - // Run the work item. - SetCurrentThread(); - cb.callback(); - // pop_back here destroys the callback - freeing any resources it might hold. - // We do so before clearing the current thread in case the callback destructor - // wants to check that it's in the WorkSerializer too. - processing_.pop_back(); - ClearCurrentThread(); - // Check if we've drained the queue and if so refill it. - if (processing_.empty() && !Refill()) return; - // There's still work in processing_, so schedule ourselves again on - // EventEngine. - event_engine_->Run(this); -} - -WorkSerializer::DispatchingWorkSerializer::RefillResult -WorkSerializer::DispatchingWorkSerializer::RefillInner() { - // Recover any memory held by processing_, so that we don't grow forever. - // Do so before acquiring a lock so we don't cause inadvertent contention. - processing_.shrink_to_fit(); - MutexLock lock(&mu_); - // Swap incoming_ into processing_ - effectively lets us release memory - // (outside the lock) once per iteration for the storage vectors. - processing_.swap(incoming_); - // If there were no items, then we've finished running. - if (processing_.empty()) { - running_ = false; - // And if we're also orphaned then it's time to delete this object. - if (orphaned_) { - return RefillResult::kFinishedAndOrphaned; - } else { - return RefillResult::kFinished; - } - } - return RefillResult::kRefilled; -} - -bool WorkSerializer::DispatchingWorkSerializer::Refill() { - const auto result = RefillInner(); - switch (result) { - case RefillResult::kRefilled: - // Reverse processing_ so that we can pop_back() items in the correct - // order. (note that this is mostly pointer swaps inside the - // std::function's, so should be relatively cheap even for longer lists). - // Do so here so we're outside of the RefillInner lock. - std::reverse(processing_.begin(), processing_.end()); - return true; - case RefillResult::kFinished: - return false; - case RefillResult::kFinishedAndOrphaned: - // Orphaned and finished - finally delete this object. - // Here so that the mutex lock in RefillInner is released. - delete this; - return false; - } -} - // // WorkSerializer // -WorkSerializer::WorkSerializer( - std::shared_ptr event_engine) - : impl_(IsWorkSerializerDispatchEnabled() - ? OrphanablePtr( - MakeOrphanable( - std::move(event_engine))) - : OrphanablePtr( - MakeOrphanable())) {} +WorkSerializer::WorkSerializer() + : impl_(MakeOrphanable()) {} -WorkSerializer::~WorkSerializer() = default; +WorkSerializer::~WorkSerializer() {} void WorkSerializer::Run(std::function callback, const DebugLocation& location) { diff --git a/src/core/lib/gprpp/work_serializer.h b/src/core/lib/gprpp/work_serializer.h index 051a93f2aae..271de349c44 100644 --- a/src/core/lib/gprpp/work_serializer.h +++ b/src/core/lib/gprpp/work_serializer.h @@ -20,12 +20,9 @@ #include #include -#include #include "absl/base/thread_annotations.h" -#include - #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" @@ -47,28 +44,14 @@ namespace grpc_core { // invoke DrainQueue() when it is safe to invoke the callback. class ABSL_LOCKABLE WorkSerializer { public: - explicit WorkSerializer( - std::shared_ptr - event_engine); - ~WorkSerializer(); + WorkSerializer(); - WorkSerializer(const WorkSerializer&) = delete; - WorkSerializer& operator=(const WorkSerializer&) = delete; - WorkSerializer(WorkSerializer&&) noexcept = default; - WorkSerializer& operator=(WorkSerializer&&) noexcept = default; + ~WorkSerializer(); - // Runs a given callback on the work serializer. - // - // If experiment `work_serializer_dispatch` is enabled: - // The callback will be executed as an EventEngine callback, that then - // arranges for the next callback in the queue to execute. - // - // If experiment `work_serializer_dispatch` is NOT enabled: - // If there is no other thread currently executing the WorkSerializer, the - // callback is run immediately. In this case, the current thread is also - // borrowed for draining the queue for any callbacks that get added in the - // meantime. - // This behavior is deprecated and will be removed soon. + // Runs a given callback on the work serializer. If there is no other thread + // currently executing the WorkSerializer, the callback is run immediately. In + // this case, the current thread is also borrowed for draining the queue for + // any callbacks that get added in the meantime. // // If you want to use clang thread annotation to make sure that callback is // called by WorkSerializer only, you need to add the annotation to both the @@ -81,6 +64,9 @@ class ABSL_LOCKABLE WorkSerializer { // }, DEBUG_LOCATION); // } // void callback() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer) { ... } + // + // TODO(yashkt): Replace DebugLocation with absl::SourceLocation + // once we can start using it directly. void Run(std::function callback, const DebugLocation& location); // Schedule \a callback to be run later when the queue of callbacks is @@ -96,8 +82,6 @@ class ABSL_LOCKABLE WorkSerializer { private: class WorkSerializerImpl; - class LegacyWorkSerializer; - class DispatchingWorkSerializer; OrphanablePtr impl_; }; diff --git a/test/core/channel/BUILD b/test/core/channel/BUILD index f374c745b5d..87c7768127b 100644 --- a/test/core/channel/BUILD +++ b/test/core/channel/BUILD @@ -123,7 +123,6 @@ grpc_cc_test( "//:grpc", "//:grpc++", "//src/core:channel_args", - "//test/core/event_engine:event_engine_test_utils", "//test/core/util:grpc_test_util", "//test/cpp/util:channel_trace_proto_helper", ], diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index 4f346ece06a..5a3750c3e0f 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -22,16 +22,13 @@ #include #include -#include #include -#include #include #include "absl/status/status.h" #include "absl/status/statusor.h" #include "gtest/gtest.h" -#include #include #include #include @@ -41,21 +38,15 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channelz_registry.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_reader.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/server.h" -#include "test/core/event_engine/event_engine_test_utils.h" #include "test/core/util/test_config.h" #include "test/cpp/util/channel_trace_proto_helper.h" -using grpc_event_engine::experimental::GetDefaultEventEngine; -using grpc_event_engine::experimental::WaitForSingleOwner; - namespace grpc_core { namespace channelz { namespace testing { @@ -348,15 +339,9 @@ TEST_P(ChannelzChannelTest, LastCallStartedTime) { class ChannelzRegistryBasedTest : public ::testing::TestWithParam { protected: // ensure we always have a fresh registry for tests. - void SetUp() override { - WaitForSingleOwner(GetDefaultEventEngine()); - ChannelzRegistry::TestOnlyReset(); - } + void SetUp() override { ChannelzRegistry::TestOnlyReset(); } - void TearDown() override { - WaitForSingleOwner(GetDefaultEventEngine()); - ChannelzRegistry::TestOnlyReset(); - } + void TearDown() override { ChannelzRegistry::TestOnlyReset(); } }; TEST_F(ChannelzRegistryBasedTest, BasicGetTopChannelsTest) { @@ -514,27 +499,19 @@ TEST_F(ChannelzRegistryBasedTest, GetTopChannelsUuidAfterCompaction) { even_channels.push_back(std::make_unique()); } } - Notification done; - grpc_event_engine::experimental::GetDefaultEventEngine()->RunAfter( - std::chrono::seconds(5 * grpc_test_slowdown_factor()), [&] { - ExecCtx exec_ctx; - std::string json_str = ChannelzRegistry::GetTopChannels(0); - auto parsed_json = JsonParse(json_str); - ASSERT_TRUE(parsed_json.ok()) << parsed_json.status(); - ASSERT_EQ(parsed_json->type(), Json::Type::kObject); - Json channel_json; - auto it = parsed_json->object().find("channel"); - if (it != parsed_json->object().end()) channel_json = it->second; - ValidateJsonArraySize(channel_json, kLoopIterations); - std::vector uuids = - GetUuidListFromArray(channel_json.array()); - for (int i = 0; i < kLoopIterations; ++i) { - // only the even uuids will still be present. - EXPECT_EQ((i + 1) * 2, uuids[i]); - } - done.Notify(); - }); - done.WaitForNotification(); + std::string json_str = ChannelzRegistry::GetTopChannels(0); + auto parsed_json = JsonParse(json_str); + ASSERT_TRUE(parsed_json.ok()) << parsed_json.status(); + ASSERT_EQ(parsed_json->type(), Json::Type::kObject); + Json channel_json; + auto it = parsed_json->object().find("channel"); + if (it != parsed_json->object().end()) channel_json = it->second; + ValidateJsonArraySize(channel_json, kLoopIterations); + std::vector uuids = GetUuidListFromArray(channel_json.array()); + for (int i = 0; i < kLoopIterations; ++i) { + // only the even uuids will still be present. + EXPECT_EQ((i + 1) * 2, uuids[i]); + } } TEST_F(ChannelzRegistryBasedTest, InternalChannelTest) { diff --git a/test/core/client_channel/lb_policy/BUILD b/test/core/client_channel/lb_policy/BUILD index 26dff1b878a..f807e02903b 100644 --- a/test/core/client_channel/lb_policy/BUILD +++ b/test/core/client_channel/lb_policy/BUILD @@ -34,8 +34,7 @@ grpc_cc_library( deps = [ "//src/core:lb_policy", "//src/core:subchannel_interface", - "//test/core/event_engine:event_engine_test_utils", - "//test/core/event_engine/fuzzing_event_engine", + "//test/core/event_engine:mock_event_engine", ], ) @@ -83,9 +82,7 @@ grpc_cc_test( "gtest", ], language = "C++", - tags = [ - "no_test_ios", - ], + tags = ["no_test_ios"], uses_event_engine = False, uses_polling = False, deps = [ @@ -120,9 +117,7 @@ grpc_cc_test( "gtest", ], language = "C++", - tags = [ - "no_test_ios", - ], + tags = ["no_test_ios"], uses_event_engine = False, uses_polling = False, deps = [ @@ -155,9 +150,7 @@ grpc_cc_test( "gtest", ], language = "C++", - tags = [ - "no_test_ios", - ], + tags = ["no_test_ios"], uses_event_engine = False, uses_polling = False, deps = [ @@ -208,9 +201,7 @@ grpc_cc_test( "gtest", ], language = "C++", - tags = [ - "no_test_ios", - ], + tags = ["no_test_ios"], uses_event_engine = False, uses_polling = False, deps = [ diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 4059f0292da..b4d16ff5252 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -28,14 +29,15 @@ #include #include #include +#include #include #include #include -#include #include #include #include "absl/base/thread_annotations.h" +#include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_format.h" @@ -84,9 +86,7 @@ #include "src/core/lib/service_config/service_config_call_data.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/uri/uri_parser.h" -#include "test/core/event_engine/event_engine_test_utils.h" -#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" -#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" +#include "test/core/event_engine/mock_event_engine.h" namespace grpc_core { namespace testing { @@ -106,7 +106,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { // given SubchannelState object. class FakeSubchannel : public SubchannelInterface { public: - explicit FakeSubchannel(SubchannelState* state) : state_(state) {} + FakeSubchannel(SubchannelState* state, + std::shared_ptr work_serializer) + : state_(state), work_serializer_(std::move(work_serializer)) {} ~FakeSubchannel() override { if (orca_watcher_ != nullptr) { @@ -151,13 +153,27 @@ class LoadBalancingPolicyTest : public ::testing::Test { void OnConnectivityStateChange(grpc_connectivity_state new_state, const absl::Status& status) override { - gpr_log(GPR_INFO, "notifying watcher: state=%s status=%s", - ConnectivityStateName(new_state), status.ToString().c_str()); - watcher_->OnConnectivityStateChange(new_state, status); + watcher()->OnConnectivityStateChange(new_state, status); } private: - std::shared_ptr + SubchannelInterface::ConnectivityStateWatcherInterface* watcher() + const { + return Match( + watcher_, + [](const std::unique_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>& + watcher) { return watcher.get(); }, + [](const std::shared_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>& + watcher) { return watcher.get(); }); + } + + absl::variant< + std::unique_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>, + std::shared_ptr< + SubchannelInterface::ConnectivityStateWatcherInterface>> watcher_; }; @@ -165,10 +181,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface> watcher) override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { auto* watcher_ptr = watcher.get(); auto watcher_wrapper = MakeOrphanable( - state_->work_serializer(), std::move(watcher)); + work_serializer_, std::move(watcher)); watcher_map_[watcher_ptr] = watcher_wrapper.get(); state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN, std::move(watcher_wrapper)); @@ -176,7 +192,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { void CancelConnectivityStateWatch( ConnectivityStateWatcherInterface* watcher) override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { auto it = watcher_map_.find(watcher); if (it == watcher_map_.end()) return; state_->state_tracker_.RemoveWatcher(it->second); @@ -188,9 +204,8 @@ class LoadBalancingPolicyTest : public ::testing::Test { state_->requested_connection_ = true; } - void AddDataWatcher( - std::unique_ptr watcher) override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) { + void AddDataWatcher(std::unique_ptr watcher) + override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { MutexLock lock(&state_->backend_metric_watcher_mu_); auto* w = static_cast(watcher.get()); @@ -207,7 +222,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { auto connectivity_watcher = health_watcher_->TakeWatcher(); auto* connectivity_watcher_ptr = connectivity_watcher.get(); auto watcher_wrapper = MakeOrphanable( - state_->work_serializer(), std::move(connectivity_watcher)); + work_serializer_, std::move(connectivity_watcher)); health_watcher_wrapper_ = watcher_wrapper.get(); state_->state_tracker_.AddWatcher(GRPC_CHANNEL_SHUTDOWN, std::move(watcher_wrapper)); @@ -220,7 +235,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { } void CancelDataWatcher(DataWatcherInterface* watcher) override - ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->test_->work_serializer_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*state_->work_serializer_) { MutexLock lock(&state_->backend_metric_watcher_mu_); auto* w = static_cast(watcher); if (w->type() == OrcaProducer::Type()) { @@ -245,6 +260,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { void ResetBackoff() override {} SubchannelState* state_; + std::shared_ptr work_serializer_; std::map watcher_map_; @@ -253,9 +269,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::unique_ptr orca_watcher_; }; - SubchannelState(absl::string_view address, LoadBalancingPolicyTest* test) + SubchannelState(absl::string_view address, + std::shared_ptr work_serializer) : address_(address), - test_(test), + work_serializer_(std::move(work_serializer)), state_tracker_("LoadBalancingPolicyTest") {} const std::string& address() const { return address_; } @@ -313,44 +330,16 @@ class LoadBalancingPolicyTest : public ::testing::Test { << "bug in test: " << ConnectivityStateName(state) << " must have OK status: " << status; } - // Updating the state in the state tracker will enqueue - // notifications to watchers on the WorkSerializer. If any - // subchannel reports READY, the pick_first leaf policy will then - // start a health watch, whose initial notification will also be - // scheduled on the WorkSerializer. We don't want to return until - // all of those notifications have been delivered. - absl::Notification notification; - test_->work_serializer_->Run( - [&]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*test_->work_serializer_) { - if (validate_state_transition) { - AssertValidConnectivityStateTransition(state_tracker_.state(), - state, location); - } - gpr_log(GPR_INFO, "Setting state on tracker"); - state_tracker_.SetState(state, status, "set from test"); - // SetState() enqueued the connectivity state notifications for - // the subchannel, so we add another callback to the queue to be - // executed after that state notifications has been delivered. - gpr_log(GPR_INFO, - "Waiting for state notifications to be delivered"); - test_->work_serializer_->Run( - [&]() { - gpr_log(GPR_INFO, - "State notifications delivered, waiting for health " - "notifications"); - // Now the connectivity state notifications has been - // delivered. If the state reported was READY, then the - // pick_first leaf policy will have started a health watch, so - // we add another callback to the queue to be executed after - // the initial health watch notification has been delivered. - test_->work_serializer_->Run([&]() { notification.Notify(); }, - DEBUG_LOCATION); - }, - DEBUG_LOCATION); - }, + work_serializer_->Run( + [this, state, status, validate_state_transition, location]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) { + if (validate_state_transition) { + AssertValidConnectivityStateTransition(state_tracker_.state(), + state, location); + } + state_tracker_.SetState(state, status, "set from test"); + }, DEBUG_LOCATION); - notification.WaitForNotification(); - gpr_log(GPR_INFO, "Health notifications delivered"); } // Indicates if any of the associated SubchannelInterface objects @@ -362,8 +351,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { } // To be invoked by FakeHelper. - RefCountedPtr CreateSubchannel() { - return MakeRefCounted(this); + RefCountedPtr CreateSubchannel( + std::shared_ptr work_serializer) { + return MakeRefCounted(this, std::move(work_serializer)); } // Sends an OOB backend metric report to all watchers. @@ -384,15 +374,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { } } - std::shared_ptr work_serializer() { - return test_->work_serializer_; - } - private: const std::string address_; - LoadBalancingPolicyTest* const test_; - ConnectivityStateTracker state_tracker_ - ABSL_GUARDED_BY(*test_->work_serializer_); + std::shared_ptr work_serializer_; + ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_); Mutex requested_connection_mu_; bool requested_connection_ ABSL_GUARDED_BY(&requested_connection_mu_) = @@ -424,7 +409,13 @@ class LoadBalancingPolicyTest : public ::testing::Test { std::string ToString() const { return "RERESOLUTION"; } }; - explicit FakeHelper(LoadBalancingPolicyTest* test) : test_(test) {} + FakeHelper(LoadBalancingPolicyTest* test, + std::shared_ptr work_serializer, + std::shared_ptr + event_engine) + : test_(test), + work_serializer_(std::move(work_serializer)), + event_engine_(std::move(event_engine)) {} bool QueueEmpty() { MutexLock lock(&mu_); @@ -482,39 +473,6 @@ class LoadBalancingPolicyTest : public ::testing::Test { } private: - // A wrapper for a picker that hops into the WorkSerializer to - // release the ref to the picker. - class PickerWrapper : public LoadBalancingPolicy::SubchannelPicker { - public: - PickerWrapper(LoadBalancingPolicyTest* test, - RefCountedPtr picker) - : test_(test), picker_(std::move(picker)) { - gpr_log(GPR_INFO, "creating wrapper %p for picker %p", this, - picker_.get()); - } - - void Orphan() override { - absl::Notification notification; - test_->work_serializer_->Run( - [notification = ¬ification, - picker = std::move(picker_)]() mutable { - picker.reset(); - notification->Notify(); - }, - DEBUG_LOCATION); - notification.WaitForNotification(); - } - - LoadBalancingPolicy::PickResult Pick( - LoadBalancingPolicy::PickArgs args) override { - return picker_->Pick(args); - } - - private: - LoadBalancingPolicyTest* const test_; - RefCountedPtr picker_; - }; - // Represents an event reported by the LB policy. using Event = absl::variant; @@ -544,19 +502,18 @@ class LoadBalancingPolicyTest : public ::testing::Test { GPR_ASSERT(address_uri.ok()); it = test_->subchannel_pool_ .emplace(std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(std::move(*address_uri), test_)) + std::forward_as_tuple(std::move(*address_uri), + work_serializer_)) .first; } - return it->second.CreateSubchannel(); + return it->second.CreateSubchannel(work_serializer_); } void UpdateState( grpc_connectivity_state state, const absl::Status& status, RefCountedPtr picker) override { MutexLock lock(&mu_); - StateUpdate update{ - state, status, - MakeRefCounted(test_, std::move(picker))}; + StateUpdate update{state, status, std::move(picker)}; gpr_log(GPR_INFO, "state update from LB policy: %s", update.ToString().c_str()); queue_.push_back(std::move(update)); @@ -579,12 +536,14 @@ class LoadBalancingPolicyTest : public ::testing::Test { } grpc_event_engine::experimental::EventEngine* GetEventEngine() override { - return test_->fuzzing_ee_.get(); + return event_engine_.get(); } void AddTraceEvent(TraceSeverity, absl::string_view) override {} LoadBalancingPolicyTest* test_; + std::shared_ptr work_serializer_; + std::shared_ptr event_engine_; Mutex mu_; std::deque queue_ ABSL_GUARDED_BY(&mu_); @@ -674,36 +633,10 @@ class LoadBalancingPolicyTest : public ::testing::Test { const absl::optional backend_metric_data_; }; - explicit LoadBalancingPolicyTest(absl::string_view lb_policy_name) - : lb_policy_name_(lb_policy_name) {} - - void SetUp() override { - // Order is important here: Fuzzing EE needs to be created before - // grpc_init(), and the POSIX EE (which is used by the WorkSerializer) - // needs to be created after grpc_init(). - fuzzing_ee_ = - std::make_shared( - grpc_event_engine::experimental::FuzzingEventEngine::Options(), - fuzzing_event_engine::Actions()); - grpc_init(); - event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); - work_serializer_ = std::make_shared(event_engine_); - auto helper = std::make_unique(this); - helper_ = helper.get(); - LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper), - ChannelArgs()}; - lb_policy_ = - CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( - lb_policy_name_, std::move(args)); - GPR_ASSERT(lb_policy_ != nullptr); - } + LoadBalancingPolicyTest() + : work_serializer_(std::make_shared()) {} void TearDown() override { - fuzzing_ee_->FuzzingDone(); - // Make sure pickers (and transitively, subchannels) are unreffed before - // destroying the fixture. - WaitForWorkSerializerToFlush(); - work_serializer_.reset(); // Note: Can't safely trigger this from inside the FakeHelper dtor, // because if there is a picker in the queue that is holding a ref // to the LB policy, that will prevent the LB policy from being @@ -711,18 +644,20 @@ class LoadBalancingPolicyTest : public ::testing::Test { // (This will cause an ASAN failure, but it will not display the // queued events, so the failure will be harder to diagnose.) helper_->ExpectQueueEmpty(); - lb_policy_.reset(); - fuzzing_ee_->TickUntilIdle(); - grpc_event_engine::experimental::WaitForSingleOwner( - std::move(event_engine_)); - event_engine_.reset(); - grpc_shutdown_blocking(); - fuzzing_ee_.reset(); } - LoadBalancingPolicy* lb_policy() const { - GPR_ASSERT(lb_policy_ != nullptr); - return lb_policy_.get(); + // Creates an LB policy of the specified name. + // Creates a new FakeHelper for the new LB policy, and sets helper_ to + // point to the FakeHelper. + OrphanablePtr MakeLbPolicy(absl::string_view name) { + auto helper = + std::make_unique(this, work_serializer_, event_engine_); + helper_ = helper.get(); + LoadBalancingPolicy::Args args = {work_serializer_, std::move(helper), + ChannelArgs()}; + return CoreConfiguration::Get() + .lb_policy_registry() + .CreateLoadBalancingPolicy(name, std::move(args)); } // Creates an LB policy config from json. @@ -764,41 +699,14 @@ class LoadBalancingPolicyTest : public ::testing::Test { LoadBalancingPolicy* lb_policy) { ExecCtx exec_ctx; absl::Status status; - // When the LB policy gets the update, it will create new - // subchannels, and it will register connectivity state watchers and - // optionally health watchers for each one. We don't want to return - // until all the initial notifications for all of those watchers - // have been delivered to the LB policy. absl::Notification notification; work_serializer_->Run( [&]() { status = lb_policy->UpdateLocked(std::move(update_args)); - // UpdateLocked() enqueued the initial connectivity state - // notifications for the subchannels, so we add another - // callback to the queue to be executed after those initial - // state notifications have been delivered. - gpr_log(GPR_INFO, - "Applied update, waiting for initial connectivity state " - "notifications"); - work_serializer_->Run( - [&]() { - gpr_log(GPR_INFO, - "Initial connectivity state notifications delivered; " - "waiting for health notifications"); - // Now that the initial state notifications have been - // delivered, the queue will contain the health watch - // notifications for any subchannels in state READY, - // so we add another callback to the queue to be - // executed after those health watch notifications have - // been delivered. - work_serializer_->Run([&]() { notification.Notify(); }, - DEBUG_LOCATION); - }, - DEBUG_LOCATION); + notification.Notify(); }, DEBUG_LOCATION); notification.WaitForNotification(); - gpr_log(GPR_INFO, "health notifications delivered"); return status; } @@ -951,7 +859,6 @@ class LoadBalancingPolicyTest : public ::testing::Test { return false; // Stop. }, location); - gpr_log(GPR_INFO, "done waiting for expected RR addresses"); return retval; } @@ -1108,30 +1015,18 @@ class LoadBalancingPolicyTest : public ::testing::Test { RefCountedPtr ExpectRoundRobinStartup( absl::Span addresses) { RefCountedPtr picker; - // RR should have created a subchannel for each address. for (size_t i = 0; i < addresses.size(); ++i) { auto* subchannel = FindSubchannel(addresses[i]); EXPECT_NE(subchannel, nullptr); if (subchannel == nullptr) return nullptr; - // RR should ask each subchannel to connect. EXPECT_TRUE(subchannel->ConnectionRequested()); subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); - // Expect the initial CONNECTNG update with a picker that queues. if (i == 0) ExpectConnectingUpdate(); - // The connection attempts succeed. subchannel->SetConnectivityState(GRPC_CHANNEL_READY); if (i == 0) { - // When the first subchannel becomes READY, accept any number of - // CONNECTING updates with a picker that queues followed by a READY - // update with a picker that repeatedly returns only the first address. picker = WaitForConnected(); ExpectRoundRobinPicks(picker.get(), {addresses[0]}); } else { - // When each subsequent subchannel becomes READY, we accept any number - // of READY updates where the picker returns only the previously - // connected subchannel(s) followed by a READY update where the picker - // returns the previously connected subchannel(s) *and* the newly - // connected subchannel. picker = WaitForRoundRobinListChange( absl::MakeSpan(addresses).subspan(0, i), absl::MakeSpan(addresses).subspan(0, i + 1)); @@ -1196,57 +1091,87 @@ class LoadBalancingPolicyTest : public ::testing::Test { SubchannelKey key(MakeAddress(address), args); auto it = subchannel_pool_ .emplace(std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(address, this)) + std::forward_as_tuple(address, work_serializer_)) .first; return &it->second; } - void WaitForWorkSerializerToFlush() { - gpr_log(GPR_INFO, "waiting for WorkSerializer to flush..."); - absl::Notification notification; - work_serializer_->Run([&]() { notification.Notify(); }, DEBUG_LOCATION); - notification.WaitForNotification(); - gpr_log(GPR_INFO, "WorkSerializer flush complete"); + std::shared_ptr work_serializer_; + std::shared_ptr event_engine_ = + grpc_event_engine::experimental::GetDefaultEventEngine(); + FakeHelper* helper_ = nullptr; + std::map subchannel_pool_; +}; + +// A subclass to be used for LB policies that start timers. +// Injects a mock EventEngine and provides the necessary framework for +// incrementing time and handling timer callbacks. +class TimeAwareLoadBalancingPolicyTest : public LoadBalancingPolicyTest { + protected: + // A custom time cache for which InvalidateCache() is a no-op. This + // ensures that when the timer callback instantiates its own ExecCtx + // and therefore its own ScopedTimeCache, it continues to see the time + // that we are injecting in the test. + class TestTimeCache final : public Timestamp::ScopedSource { + public: + TestTimeCache() : cached_time_(previous()->Now()) {} + + Timestamp Now() override { return cached_time_; } + void InvalidateCache() override {} + + void IncrementBy(Duration duration) { cached_time_ += duration; } + + private: + Timestamp cached_time_; + }; + + TimeAwareLoadBalancingPolicyTest() { + auto mock_ee = + std::make_shared(); + auto capture = [this](std::chrono::duration duration, + absl::AnyInvocable callback) { + CheckExpectedTimerDuration(duration); + intptr_t key = next_key_++; + timer_callbacks_[key] = std::move(callback); + return grpc_event_engine::experimental::EventEngine::TaskHandle{key, 0}; + }; + ON_CALL(*mock_ee, + RunAfter(::testing::_, ::testing::A>())) + .WillByDefault(capture); + auto cancel = + [this]( + grpc_event_engine::experimental::EventEngine::TaskHandle handle) { + auto it = timer_callbacks_.find(handle.keys[0]); + if (it == timer_callbacks_.end()) return false; + timer_callbacks_.erase(it); + return true; + }; + ON_CALL(*mock_ee, Cancel(::testing::_)).WillByDefault(cancel); + // Store in base class, to make it visible to the LB policy. + event_engine_ = std::move(mock_ee); } - void IncrementTimeBy(Duration duration) { - fuzzing_ee_->TickForDuration(duration); - // Flush WorkSerializer, in case the timer callback enqueued anything. - WaitForWorkSerializerToFlush(); + ~TimeAwareLoadBalancingPolicyTest() override { + EXPECT_TRUE(timer_callbacks_.empty()) + << "WARNING: Test did not run all timer callbacks"; } - void SetExpectedTimerDuration( - absl::optional - duration) { - if (duration.has_value()) { - fuzzing_ee_->SetRunAfterDurationCallback( - [expected = *duration]( - grpc_event_engine::experimental::EventEngine::Duration duration) { - EXPECT_EQ(duration, expected) - << "Expected: " << expected.count() - << "ns\nActual: " << duration.count() << "ns"; - }); - } else { - fuzzing_ee_->SetRunAfterDurationCallback(nullptr); - } + void RunTimerCallback() { + ASSERT_EQ(timer_callbacks_.size(), 1UL); + auto it = timer_callbacks_.begin(); + ASSERT_NE(it->second, nullptr); + std::move(it->second)(); + timer_callbacks_.erase(it); } - std::shared_ptr - fuzzing_ee_; - // TODO(ctiller): this is a normal event engine, yet it gets its time measure - // from fuzzing_ee_ -- results are likely to be a little funky, but seem to do - // well enough for the tests we have today. - // We should transition everything here to just use fuzzing_ee_, but that - // needs some thought on how to Tick() at appropriate times, as there are - // Notification objects buried everywhere in this code, and - // WaitForNotification is deeply incompatible with a single threaded event - // engine that doesn't run callbacks until its public Tick method is called. - std::shared_ptr event_engine_; - std::shared_ptr work_serializer_; - FakeHelper* helper_ = nullptr; - std::map subchannel_pool_; - OrphanablePtr lb_policy_; - const absl::string_view lb_policy_name_; + // Called when the LB policy starts a timer. + // May be overridden by subclasses. + virtual void CheckExpectedTimerDuration( + grpc_event_engine::experimental::EventEngine::Duration) {} + + std::map> timer_callbacks_; + intptr_t next_key_ = 1; + TestTimeCache time_cache_; }; } // namespace testing diff --git a/test/core/client_channel/lb_policy/outlier_detection_test.cc b/test/core/client_channel/lb_policy/outlier_detection_test.cc index b57cfc9008c..ad218ad70bb 100644 --- a/test/core/client_channel/lb_policy/outlier_detection_test.cc +++ b/test/core/client_channel/lb_policy/outlier_detection_test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -30,12 +31,14 @@ #include "absl/types/optional.h" #include "gtest/gtest.h" +#include #include #include #include #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/json/json.h" @@ -47,7 +50,7 @@ namespace grpc_core { namespace testing { namespace { -class OutlierDetectionTest : public LoadBalancingPolicyTest { +class OutlierDetectionTest : public TimeAwareLoadBalancingPolicyTest { protected: class ConfigBuilder { public: @@ -143,12 +146,7 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest { }; OutlierDetectionTest() - : LoadBalancingPolicyTest("outlier_detection_experimental") {} - - void SetUp() override { - LoadBalancingPolicyTest::SetUp(); - SetExpectedTimerDuration(std::chrono::seconds(10)); - } + : lb_policy_(MakeLbPolicy("outlier_detection_experimental")) {} absl::optional DoPickWithFailedCall( LoadBalancingPolicy::SubchannelPicker* picker) { @@ -166,13 +164,25 @@ class OutlierDetectionTest : public LoadBalancingPolicyTest { } return address; } + + void CheckExpectedTimerDuration( + grpc_event_engine::experimental::EventEngine::Duration duration) + override { + EXPECT_EQ(duration, expected_internal_) + << "Expected: " << expected_internal_.count() << "ns" + << "\n Actual: " << duration.count() << "ns"; + } + + OrphanablePtr lb_policy_; + grpc_event_engine::experimental::EventEngine::Duration expected_internal_ = + std::chrono::seconds(10); }; TEST_F(OutlierDetectionTest, Basic) { constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443"; // Send an update containing one address. absl::Status status = ApplyUpdate( - BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy()); + BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for the address. auto* subchannel = FindSubchannel(kAddressUri); @@ -206,7 +216,7 @@ TEST_F(OutlierDetectionTest, FailurePercentage) { .SetFailurePercentageMinimumHosts(1) .SetFailurePercentageRequestVolume(1) .Build()), - lb_policy()); + lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // Expect normal startup. auto picker = ExpectRoundRobinStartup(kAddresses); @@ -217,7 +227,8 @@ TEST_F(OutlierDetectionTest, FailurePercentage) { ASSERT_TRUE(address.has_value()); gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str()); // Advance time and run the timer callback to trigger ejection. - IncrementTimeBy(Duration::Seconds(10)); + time_cache_.IncrementBy(Duration::Seconds(10)); + RunTimerCallback(); gpr_log(GPR_INFO, "### ejection complete"); if (!IsRoundRobinDelegateToPickFirstEnabled()) ExpectReresolutionRequest(); // Expect a picker update. @@ -240,7 +251,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) { .SetFailurePercentageRequestVolume(1) .SetChildPolicy({{"pick_first", Json::FromObject({})}}) .Build()), - lb_policy()); + lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for the first address. auto* subchannel = FindSubchannel(kAddresses[0]); @@ -268,7 +279,8 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) { ASSERT_TRUE(address.has_value()); gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str()); // Advance time and run the timer callback to trigger ejection. - IncrementTimeBy(Duration::Seconds(10)); + time_cache_.IncrementBy(Duration::Seconds(10)); + RunTimerCallback(); gpr_log(GPR_INFO, "### ejection timer pass complete"); // Subchannel should not be ejected. ExpectQueueEmpty(); @@ -283,5 +295,8 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index c310be443af..8547e910368 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -25,7 +25,6 @@ #include "absl/status/status.h" #include "absl/strings/string_view.h" -#include "absl/synchronization/notification.h" #include "absl/types/optional.h" #include "absl/types/span.h" #include "gmock/gmock.h" @@ -35,6 +34,7 @@ #include #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/json/json.h" @@ -48,7 +48,7 @@ namespace { class PickFirstTest : public LoadBalancingPolicyTest { protected: - PickFirstTest() : LoadBalancingPolicyTest("pick_first") {} + PickFirstTest() : lb_policy_(MakeLbPolicy("pick_first")) {} static RefCountedPtr MakePickFirstConfig( absl::optional shuffle_address_list = absl::nullopt) { @@ -65,19 +65,9 @@ class PickFirstTest : public LoadBalancingPolicyTest { void GetOrderAddressesArePicked( absl::Span addresses, std::vector* out_address_order) { + work_serializer_->Run([&]() { lb_policy_->ExitIdleLocked(); }, + DEBUG_LOCATION); out_address_order->clear(); - // Note: ExitIdle() will enqueue a bunch of connectivity state - // notifications on the WorkSerializer, and we want to wait until - // those are delivered to the LB policy. - absl::Notification notification; - work_serializer_->Run( - [&]() { - lb_policy()->ExitIdleLocked(); - work_serializer_->Run([&]() { notification.Notify(); }, - DEBUG_LOCATION); - }, - DEBUG_LOCATION); - notification.WaitForNotification(); // Construct a map of subchannel to address. // We will remove entries as each subchannel starts to connect. std::map subchannels; @@ -132,6 +122,8 @@ class PickFirstTest : public LoadBalancingPolicyTest { subchannels.erase(subchannel); } } + + OrphanablePtr lb_policy_; }; TEST_F(PickFirstTest, FirstAddressWorks) { @@ -139,7 +131,7 @@ TEST_F(PickFirstTest, FirstAddressWorks) { constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for both addresses. auto* subchannel = FindSubchannel(kAddresses[0]); @@ -172,7 +164,7 @@ TEST_F(PickFirstTest, FirstAddressFails) { constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for both addresses. auto* subchannel = FindSubchannel(kAddresses[0]); @@ -224,7 +216,7 @@ TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) { absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for all addresses. auto* subchannel3 = FindSubchannel(kAddresses[2]); @@ -267,7 +259,7 @@ TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) { absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // The LB policy should request re-resolution. ExpectReresolutionRequest(); @@ -313,7 +305,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) { absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // The LB policy should request re-resolution. ExpectReresolutionRequest(); @@ -332,7 +324,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) { constexpr std::array kAddresses2 = { kAddresses[0], "ipv4:127.0.0.1:445"}; status = ApplyUpdate(BuildUpdate(kAddresses2, MakePickFirstConfig(false)), - lb_policy()); + lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // The LB policy should have created a subchannel for the new address. auto* subchannel3 = FindSubchannel(kAddresses2[1]); @@ -357,7 +349,7 @@ TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) { constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for both addresses. auto* subchannel = FindSubchannel(kAddresses[0]); @@ -418,7 +410,7 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) { constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for both addresses. auto* subchannel = FindSubchannel(kAddresses[0]); @@ -453,13 +445,6 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) { // By checking the picker, we told the LB policy to trigger a new // connection attempt, so it should start over with the first // subchannel. - // Note that the picker will have enqueued the ExitIdle() call in the - // WorkSerializer, so the first flush will execute that call. But - // executing that call will result in enqueueing subchannel - // connectivity state notifications, so we need to flush again to make - // sure all of that work is done before we continue. - WaitForWorkSerializerToFlush(); - WaitForWorkSerializerToFlush(); EXPECT_TRUE(subchannel->ConnectionRequested()); // The subchannel starts connecting. subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); @@ -488,7 +473,7 @@ TEST_F(PickFirstTest, WithShuffle) { bool shuffled = false; for (size_t i = 0; i < kMaxTries; ++i) { absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(true)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; GetOrderAddressesArePicked(kAddresses, &addresses_after_update); if (absl::MakeConstSpan(addresses_after_update) != @@ -511,7 +496,7 @@ TEST_F(PickFirstTest, ShufflingDisabled) { constexpr static size_t kMaxAttempts = 5; for (size_t attempt = 0; attempt < kMaxAttempts; ++attempt) { absl::Status status = ApplyUpdate( - BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy()); + BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; std::vector address_order; GetOrderAddressesArePicked(kAddresses, &address_order); @@ -525,5 +510,8 @@ TEST_F(PickFirstTest, ShufflingDisabled) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } diff --git a/test/core/client_channel/lb_policy/round_robin_test.cc b/test/core/client_channel/lb_policy/round_robin_test.cc index 665ecf9bb09..092242e66f3 100644 --- a/test/core/client_channel/lb_policy/round_robin_test.cc +++ b/test/core/client_channel/lb_policy/round_robin_test.cc @@ -14,6 +14,8 @@ // limitations under the License. // +#include + #include #include "absl/status/status.h" @@ -21,6 +23,11 @@ #include "absl/types/span.h" #include "gtest/gtest.h" +#include + +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/load_balancing/lb_policy.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" #include "test/core/util/test_config.h" @@ -30,33 +37,68 @@ namespace { class RoundRobinTest : public LoadBalancingPolicyTest { protected: - RoundRobinTest() : LoadBalancingPolicyTest("round_robin") {} + RoundRobinTest() : lb_policy_(MakeLbPolicy("round_robin")) {} + + void ExpectStartup(absl::Span addresses) { + EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, nullptr), lb_policy_.get()), + absl::OkStatus()); + // RR should have created a subchannel for each address. + for (size_t i = 0; i < addresses.size(); ++i) { + auto* subchannel = FindSubchannel(addresses[i]); + ASSERT_NE(subchannel, nullptr) << "Address: " << addresses[i]; + // RR should ask each subchannel to connect. + EXPECT_TRUE(subchannel->ConnectionRequested()); + // The subchannel will connect successfully. + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + // Expect the initial CONNECTNG update with a picker that queues. + if (i == 0) ExpectConnectingUpdate(); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + // As each subchannel becomes READY, we should get a new picker that + // includes the behavior. Note that there may be any number of + // duplicate updates for the previous state in the queue before the + // update that we actually want to see. + if (i == 0) { + // When the first subchannel becomes READY, accept any number of + // CONNECTING updates with a picker that queues followed by a READY + // update with a picker that repeatedly returns only the first address. + auto picker = WaitForConnected(); + ExpectRoundRobinPicks(picker.get(), {addresses[0]}); + } else { + // When each subsequent subchannel becomes READY, we accept any number + // of READY updates where the picker returns only the previously + // connected subchannel(s) followed by a READY update where the picker + // returns the previously connected subchannel(s) *and* the newly + // connected subchannel. + WaitForRoundRobinListChange( + absl::MakeSpan(addresses).subspan(0, i), + absl::MakeSpan(addresses).subspan(0, i + 1)); + } + } + } + + OrphanablePtr lb_policy_; }; TEST_F(RoundRobinTest, Basic) { const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, nullptr), lb_policy()), - absl::OkStatus()); - ExpectRoundRobinStartup(kAddresses); + ExpectStartup(kAddresses); } TEST_F(RoundRobinTest, AddressUpdates) { const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, nullptr), lb_policy()), - absl::OkStatus()); - ExpectRoundRobinStartup(kAddresses); + ExpectStartup(kAddresses); // Send update to remove address 2. EXPECT_EQ( ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).first(2), nullptr), - lb_policy()), + lb_policy_.get()), absl::OkStatus()); WaitForRoundRobinListChange(kAddresses, absl::MakeSpan(kAddresses).first(2)); // Send update to remove address 0 and re-add address 2. EXPECT_EQ( ApplyUpdate(BuildUpdate(absl::MakeSpan(kAddresses).last(2), nullptr), - lb_policy()), + lb_policy_.get()), absl::OkStatus()); WaitForRoundRobinListChange(absl::MakeSpan(kAddresses).first(2), absl::MakeSpan(kAddresses).last(2)); @@ -73,5 +115,8 @@ TEST_F(RoundRobinTest, AddressUpdates) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } diff --git a/test/core/client_channel/lb_policy/weighted_round_robin_test.cc b/test/core/client_channel/lb_policy/weighted_round_robin_test.cc index 74a3dfae1eb..77020f7e92c 100644 --- a/test/core/client_channel/lb_policy/weighted_round_robin_test.cc +++ b/test/core/client_channel/lb_policy/weighted_round_robin_test.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -35,12 +36,14 @@ #include "absl/types/span.h" #include "gtest/gtest.h" +#include #include #include #include #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/json/json.h" @@ -64,7 +67,7 @@ BackendMetricData MakeBackendMetricData(double app_utilization, double qps, return b; } -class WeightedRoundRobinTest : public LoadBalancingPolicyTest { +class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest { protected: class ConfigBuilder { public: @@ -110,11 +113,8 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest { Json::Object json_; }; - WeightedRoundRobinTest() : LoadBalancingPolicyTest("weighted_round_robin") {} - - void SetUp() override { - LoadBalancingPolicyTest::SetUp(); - SetExpectedTimerDuration(std::chrono::seconds(1)); + WeightedRoundRobinTest() { + lb_policy_ = MakeLbPolicy("weighted_round_robin"); } RefCountedPtr @@ -125,7 +125,7 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest { SourceLocation location = SourceLocation()) { if (update_addresses.empty()) update_addresses = addresses; EXPECT_EQ(ApplyUpdate(BuildUpdate(update_addresses, config_builder.Build()), - lb_policy()), + lb_policy_.get()), absl::OkStatus()); // Expect the initial CONNECTNG update with a picker that queues. ExpectConnectingUpdate(location); @@ -311,11 +311,24 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest { if (*picker == nullptr) return false; } else if (run_timer_callbacks) { gpr_log(GPR_INFO, "running timer callback..."); - // Increment time and run any timer callbacks. - IncrementTimeBy(Duration::Seconds(1)); + RunTimerCallback(); } + // Increment time. + time_cache_.IncrementBy(Duration::Seconds(1)); } } + + void CheckExpectedTimerDuration( + grpc_event_engine::experimental::EventEngine::Duration duration) + override { + EXPECT_EQ(duration, expected_weight_update_interval_) + << "Expected: " << expected_weight_update_interval_.count() << "ns" + << "\n Actual: " << duration.count() << "ns"; + } + + OrphanablePtr lb_policy_; + grpc_event_engine::experimental::EventEngine::Duration + expected_weight_update_interval_ = std::chrono::seconds(1); }; TEST_F(WeightedRoundRobinTest, Basic) { @@ -630,7 +643,7 @@ TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) { TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) { const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - SetExpectedTimerDuration(std::chrono::seconds(2)); + expected_weight_update_interval_ = std::chrono::seconds(2); auto picker = SendInitialUpdateAndWaitForConnected( kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Seconds(2))); ASSERT_NE(picker, nullptr); @@ -648,7 +661,7 @@ TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) { TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) { const std::array kAddresses = { "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; - SetExpectedTimerDuration(std::chrono::milliseconds(100)); + expected_weight_update_interval_ = std::chrono::milliseconds(100); auto picker = SendInitialUpdateAndWaitForConnected( kAddresses, ConfigBuilder().SetWeightUpdatePeriod(Duration::Milliseconds(10))); @@ -684,7 +697,8 @@ TEST_F(WeightedRoundRobinTest, WeightExpirationPeriod) { {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}}); // Advance time to make weights stale and trigger the timer callback // to recompute weights. - IncrementTimeBy(Duration::Seconds(2)); + time_cache_.IncrementBy(Duration::Seconds(2)); + RunTimerCallback(); // Picker should now be falling back to round-robin. ExpectWeightedRoundRobinPicks( picker.get(), {}, @@ -711,7 +725,8 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) { {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}}); // Advance time to make weights stale and trigger the timer callback // to recompute weights. - IncrementTimeBy(Duration::Seconds(2)); + time_cache_.IncrementBy(Duration::Seconds(2)); + RunTimerCallback(); // Picker should now be falling back to round-robin. ExpectWeightedRoundRobinPicks( picker.get(), {}, @@ -729,7 +744,8 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) { {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}}); // Advance time past the blackout period. This should cause the // weights to be used. - IncrementTimeBy(Duration::Seconds(1)); + time_cache_.IncrementBy(Duration::Seconds(1)); + RunTimerCallback(); ExpectWeightedRoundRobinPicks( picker.get(), {}, {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 1}}); @@ -775,7 +791,8 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) { {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}}); // Advance time to exceed the blackout period and trigger the timer // callback to recompute weights. - IncrementTimeBy(Duration::Seconds(1)); + time_cache_.IncrementBy(Duration::Seconds(1)); + RunTimerCallback(); ExpectWeightedRoundRobinPicks( picker.get(), {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3, @@ -807,9 +824,9 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodDoesNotGetResetAfterUpdate) { /*qps=*/100.0, /*eps=*/0.0)}}, {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}}); // Send a duplicate update with the same addresses and config. - EXPECT_EQ( - ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()), lb_policy()), - absl::OkStatus()); + EXPECT_EQ(ApplyUpdate(BuildUpdate(kAddresses, config_builder.Build()), + lb_policy_.get()), + absl::OkStatus()); // Note that we have not advanced time, so if the update incorrectly // triggers resetting the blackout period, none of the weights will // actually be used. @@ -852,5 +869,8 @@ TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc index e7c73794946..1395542026e 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -34,6 +34,7 @@ #include "src/core/ext/filters/stateful_session/stateful_session_filter.h" #include "src/core/ext/xds/xds_health_status.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" @@ -47,7 +48,7 @@ namespace { class XdsOverrideHostTest : public LoadBalancingPolicyTest { protected: XdsOverrideHostTest() - : LoadBalancingPolicyTest("xds_override_host_experimental") {} + : policy_(MakeLbPolicy("xds_override_host_experimental")) {} static RefCountedPtr MakeXdsOverrideHostConfig( absl::Span override_host_status = {"UNKNOWN", @@ -71,7 +72,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { RefCountedPtr ExpectStartupWithRoundRobin(absl::Span addresses) { EXPECT_EQ(ApplyUpdate(BuildUpdate(addresses, MakeXdsOverrideHostConfig()), - lb_policy()), + policy_.get()), absl::OkStatus()); return ExpectRoundRobinStartup(addresses); } @@ -95,7 +96,7 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { update.addresses->push_back(MakeAddressWithHealthStatus( address_and_status.first, address_and_status.second)); } - EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus()); + EXPECT_EQ(ApplyUpdate(update, policy_.get()), absl::OkStatus()); } CallAttributes MakeOverrideHostAttribute(absl::string_view host) { @@ -104,6 +105,8 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { std::make_unique(host)); return override_host_attributes; } + + OrphanablePtr policy_; }; TEST_F(XdsOverrideHostTest, DelegatesToChild) { @@ -115,7 +118,7 @@ TEST_F(XdsOverrideHostTest, NoConfigReportsError) { EXPECT_EQ( ApplyUpdate( BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr), - lb_policy()), + policy_.get()), absl::InvalidArgumentError("Missing policy config")); } @@ -161,7 +164,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) { // Some other address is gone EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[1]}, MakeXdsOverrideHostConfig()), - lb_policy()), + policy_.get()), absl::OkStatus()); // Wait for LB policy to return a new picker that uses the updated // addresses. We can't use the host override for this, because then @@ -175,7 +178,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) { // "Our" address is gone so others get returned in round-robin order EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[0], kAddresses[2]}, MakeXdsOverrideHostConfig()), - lb_policy()), + policy_.get()), absl::OkStatus()); // Wait for LB policy to return the new picker. // In this case, we can pass call_attributes while we wait instead of @@ -187,7 +190,7 @@ TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) { // And now it is back EXPECT_EQ(ApplyUpdate(BuildUpdate({kAddresses[1], kAddresses[2]}, MakeXdsOverrideHostConfig()), - lb_policy()), + policy_.get()), absl::OkStatus()); // Wait for LB policy to return the new picker. picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]}, @@ -335,7 +338,6 @@ TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) { // picks where the override host is CONNECTING. All picks without an // override host should not use this host. gpr_log(GPR_INFO, "### subchannel starts reconnecting"); - WaitForWorkSerializerToFlush(); EXPECT_TRUE(subchannel->ConnectionRequested()); ExpectQueueEmpty(); subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); @@ -467,5 +469,8 @@ TEST_F(XdsOverrideHostTest, OverrideHostStatus) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; } diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 4d76c942218..998f77398e6 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -411,8 +411,7 @@ static void test_cooldown() { TEST(DnsResolverCooldownTest, MainTest) { grpc_init(); - auto work_serializer = std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); + auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; g_default_dns_lookup_ares = grpc_dns_lookup_hostname_ares; diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index 41c22cc4b48..735370b96c2 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -87,8 +87,7 @@ static void test_fails(grpc_core::ResolverFactory* factory, } TEST(DnsResolverTest, MainTest) { - auto work_serializer = std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); + auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; grpc_core::ResolverFactory* dns = grpc_core::CoreConfiguration::Get() diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc index 91926eb810d..3f9d50470b4 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.cc +++ b/test/core/client_channel/resolvers/fake_resolver_test.cc @@ -22,11 +22,9 @@ #include #include -#include #include #include #include -#include #include #include @@ -42,8 +40,6 @@ #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/event_engine/default_event_engine.h" -#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/work_serializer.h" @@ -59,14 +55,12 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { public: void SetExpectedAndEvent(grpc_core::Resolver::Result expected, gpr_event* ev) { - grpc_core::MutexLock lock(&mu_); ASSERT_EQ(ev_, nullptr); expected_ = std::move(expected); ev_ = ev; } void ReportResult(grpc_core::Resolver::Result actual) override { - grpc_core::MutexLock lock(&mu_); ASSERT_NE(ev_, nullptr); // We only check the addresses, because that's the only thing // explicitly set by the test via @@ -81,9 +75,8 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { } private: - grpc_core::Mutex mu_; - grpc_core::Resolver::Result expected_ ABSL_GUARDED_BY(mu_); - gpr_event* ev_ ABSL_GUARDED_BY(mu_) = nullptr; + grpc_core::Resolver::Result expected_; + gpr_event* ev_ = nullptr; }; static grpc_core::OrphanablePtr build_fake_resolver( @@ -131,18 +124,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() { TEST(FakeResolverTest, FakeResolver) { grpc_core::ExecCtx exec_ctx; std::shared_ptr work_serializer = - std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); - auto synchronously = [work_serializer](std::function do_this_thing) { - grpc_core::Notification notification; - work_serializer->Run( - [do_this_thing = std::move(do_this_thing), ¬ification]() mutable { - do_this_thing(); - notification.Notify(); - }, - DEBUG_LOCATION); - notification.WaitForNotification(); - }; + std::make_shared(); // Create resolver. ResultHandler* result_handler = new ResultHandler(); grpc_core::RefCountedPtr @@ -152,7 +134,7 @@ TEST(FakeResolverTest, FakeResolver) { work_serializer, response_generator.get(), std::unique_ptr(result_handler)); ASSERT_NE(resolver.get(), nullptr); - synchronously([resolver = resolver.get()] { resolver->StartLocked(); }); + resolver->StartLocked(); // Test 1: normal resolution. // next_results != NULL, reresolution_results == NULL. // Expected response is next_results. @@ -161,7 +143,7 @@ TEST(FakeResolverTest, FakeResolver) { gpr_event ev1; gpr_event_init(&ev1); result_handler->SetExpectedAndEvent(result, &ev1); - response_generator->SetResponseSynchronously(std::move(result)); + response_generator->SetResponse(std::move(result)); grpc_core::ExecCtx::Get()->Flush(); ASSERT_NE(gpr_event_wait(&ev1, grpc_timeout_seconds_to_deadline(5)), nullptr); // Test 2: update resolution. @@ -172,7 +154,7 @@ TEST(FakeResolverTest, FakeResolver) { gpr_event ev2; gpr_event_init(&ev2); result_handler->SetExpectedAndEvent(result, &ev2); - response_generator->SetResponseSynchronously(std::move(result)); + response_generator->SetResponse(std::move(result)); grpc_core::ExecCtx::Get()->Flush(); ASSERT_NE(gpr_event_wait(&ev2, grpc_timeout_seconds_to_deadline(5)), nullptr); // Test 3: normal re-resolution. @@ -186,11 +168,10 @@ TEST(FakeResolverTest, FakeResolver) { result_handler->SetExpectedAndEvent(reresolution_result, &ev3); // Set reresolution_results. // No result will be returned until re-resolution is requested. - response_generator->SetReresolutionResponseSynchronously(reresolution_result); + response_generator->SetReresolutionResponse(reresolution_result); grpc_core::ExecCtx::Get()->Flush(); // Trigger a re-resolution. - synchronously( - [resolver = resolver.get()] { resolver->RequestReresolutionLocked(); }); + resolver->RequestReresolutionLocked(); grpc_core::ExecCtx::Get()->Flush(); ASSERT_NE(gpr_event_wait(&ev3, grpc_timeout_seconds_to_deadline(5)), nullptr); // Test 4: repeat re-resolution. @@ -201,8 +182,7 @@ TEST(FakeResolverTest, FakeResolver) { gpr_event_init(&ev4); result_handler->SetExpectedAndEvent(std::move(reresolution_result), &ev4); // Trigger a re-resolution. - synchronously( - [resolver = resolver.get()] { resolver->RequestReresolutionLocked(); }); + resolver->RequestReresolutionLocked(); grpc_core::ExecCtx::Get()->Flush(); ASSERT_NE(gpr_event_wait(&ev4, grpc_timeout_seconds_to_deadline(5)), nullptr); // Test 5: normal resolution. @@ -213,7 +193,7 @@ TEST(FakeResolverTest, FakeResolver) { gpr_event ev5; gpr_event_init(&ev5); result_handler->SetExpectedAndEvent(result, &ev5); - response_generator->SetResponseSynchronously(std::move(result)); + response_generator->SetResponse(std::move(result)); grpc_core::ExecCtx::Get()->Flush(); ASSERT_NE(gpr_event_wait(&ev5, grpc_timeout_seconds_to_deadline(5)), nullptr); // Test 6: no-op. diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc index b0485c639bd..b600214ec30 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc @@ -28,7 +28,6 @@ #include #include "src/core/lib/config/core_configuration.h" -#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -89,8 +88,7 @@ static void test_fails(grpc_core::ResolverFactory* factory, } TEST(SockaddrResolverTest, MainTest) { - auto work_serializer = std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); + auto work_serializer = std::make_shared(); g_work_serializer = &work_serializer; grpc_core::ResolverFactory* ipv4 = grpc_core::CoreConfiguration::Get() diff --git a/test/core/end2end/no_server_test.cc b/test/core/end2end/no_server_test.cc index 8172ae83bd8..60f940cdedb 100644 --- a/test/core/end2end/no_server_test.cc +++ b/test/core/end2end/no_server_test.cc @@ -80,7 +80,6 @@ void run_test(bool wait_for_ready) { grpc_core::CqVerifier::tag(1), nullptr)); { - response_generator->WaitForResolverSet(); grpc_core::ExecCtx exec_ctx; response_generator->SetFailure(); } diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index 1442987828e..a5441e2b962 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -221,10 +221,9 @@ grpc_cc_library( "absl/time", ], deps = [ - "//:event_engine_base_hdrs", "//:gpr", + "//:grpc_unsecure", "//src/core:channel_args_endpoint_config", - "//src/core:event_engine_common", "//src/core:event_engine_tcp_socket_utils", "//src/core:memory_quota", "//src/core:notification", diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc index ec781e0ceb5..b3ea892faa1 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc @@ -202,12 +202,6 @@ void FuzzingEventEngine::TickForDuration(grpc_core::Duration d) { TickUntilTimestamp(grpc_core::Timestamp::Now() + d); } -void FuzzingEventEngine::SetRunAfterDurationCallback( - absl::AnyInvocable callback) { - grpc_core::MutexLock lock(&run_after_duration_callback_mu_); - run_after_duration_callback_ = std::move(callback); -} - FuzzingEventEngine::Time FuzzingEventEngine::Now() { grpc_core::MutexLock lock(&*now_mu_); return now_; @@ -440,10 +434,8 @@ EventEngine::ConnectionHandle FuzzingEventEngine::Connect( // TODO(ctiller): do something with the timeout // Schedule a timer to run (with some fuzzer selected delay) the on_connect // callback. - grpc_core::MutexLock lock(&*mu_); - auto task_handle = RunAfterLocked( - RunType::kRunAfter, Duration(0), - [this, addr, on_connect = std::move(on_connect)]() mutable { + auto task_handle = RunAfter( + Duration(0), [this, addr, on_connect = std::move(on_connect)]() mutable { // Check for a legal address and extract the target port number. auto port = ResolvedAddressGetPort(addr); grpc_core::MutexLock lock(&*mu_); @@ -497,14 +489,11 @@ FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) { } void FuzzingEventEngine::Run(Closure* closure) { - grpc_core::MutexLock lock(&*mu_); - RunAfterLocked(RunType::kRunAfter, Duration::zero(), - [closure]() { closure->Run(); }); + RunAfter(Duration::zero(), closure); } void FuzzingEventEngine::Run(absl::AnyInvocable closure) { - grpc_core::MutexLock lock(&*mu_); - RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure)); + RunAfter(Duration::zero(), std::move(closure)); } EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when, @@ -514,12 +503,6 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when, EventEngine::TaskHandle FuzzingEventEngine::RunAfter( Duration when, absl::AnyInvocable closure) { - { - grpc_core::MutexLock lock(&run_after_duration_callback_mu_); - if (run_after_duration_callback_ != nullptr) { - run_after_duration_callback_(when); - } - } grpc_core::MutexLock lock(&*mu_); // (b/258949216): Cap it to one year to avoid integer overflow errors. return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear), diff --git a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h index af58c7b8d59..6a1c34f3150 100644 --- a/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h +++ b/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -79,10 +79,6 @@ class FuzzingEventEngine : public EventEngine { // Tick for some grpc_core::Duration void TickForDuration(grpc_core::Duration d) ABSL_LOCKS_EXCLUDED(mu_); - // Sets a callback to be invoked any time RunAfter() is called. - // Allows tests to verify the specified duration. - void SetRunAfterDurationCallback(absl::AnyInvocable callback); - absl::StatusOr> CreateListener( Listener::AcceptCallback on_accept, absl::AnyInvocable on_shutdown, @@ -294,10 +290,6 @@ class FuzzingEventEngine : public EventEngine { std::queue> write_sizes_for_future_connections_ ABSL_GUARDED_BY(mu_); grpc_pick_port_functions previous_pick_port_functions_; - - grpc_core::Mutex run_after_duration_callback_mu_; - absl::AnyInvocable run_after_duration_callback_ - ABSL_GUARDED_BY(run_after_duration_callback_mu_); }; } // namespace experimental diff --git a/test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc b/test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc index 483ad6b0e4a..5c862fa074e 100644 --- a/test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc +++ b/test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.cc @@ -143,12 +143,6 @@ class FuzzingResolverEventEngine } bool Cancel(TaskHandle /* handle */) override { return true; } - void Run(absl::AnyInvocable fn) override { - runner_.Run(std::move(fn)); - } - - void Run(Closure* fn) override { runner_.Run(fn); } - void Tick() { runner_.Tick(); } private: @@ -265,7 +259,7 @@ DEFINE_PROTO_FUZZER(const event_engine_client_channel_resolver::Msg& msg) { grpc_event_engine::experimental::GetDefaultEventEngine()); { // scoped to ensure the resolver is orphaned when done resolving. - auto work_serializer = std::make_shared(engine); + auto work_serializer = std::make_shared(); EventEngineClientChannelDNSResolverFactory resolver_factory; auto resolver_args = ConstructResolverArgs( grpc_core::testing::CreateChannelArgsFromFuzzingConfiguration( diff --git a/test/core/gprpp/BUILD b/test/core/gprpp/BUILD index 283b9a44b3d..09f7c9a281d 100644 --- a/test/core/gprpp/BUILD +++ b/test/core/gprpp/BUILD @@ -383,7 +383,6 @@ grpc_cc_test( deps = [ "//:gpr", "//:grpc", - "//test/core/event_engine:event_engine_test_utils", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/gprpp/work_serializer_test.cc b/test/core/gprpp/work_serializer_test.cc index 8a7259efb1d..e05ad9d8976 100644 --- a/test/core/gprpp/work_serializer_test.cc +++ b/test/core/gprpp/work_serializer_test.cc @@ -32,22 +32,15 @@ #include #include -#include "src/core/lib/event_engine/default_event_engine.h" -#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/thd.h" -#include "test/core/event_engine/event_engine_test_utils.h" #include "test/core/util/test_config.h" -using grpc_event_engine::experimental::GetDefaultEventEngine; -using grpc_event_engine::experimental::WaitForSingleOwner; - -namespace grpc_core { namespace { -TEST(WorkSerializerTest, NoOp) { WorkSerializer lock(GetDefaultEventEngine()); } +TEST(WorkSerializerTest, NoOp) { grpc_core::WorkSerializer lock; } TEST(WorkSerializerTest, ExecuteOneRun) { - WorkSerializer lock(GetDefaultEventEngine()); + grpc_core::WorkSerializer lock; gpr_event done; gpr_event_init(&done); lock.Run([&done]() { gpr_event_set(&done, reinterpret_cast(1)); }, @@ -57,7 +50,7 @@ TEST(WorkSerializerTest, ExecuteOneRun) { } TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) { - WorkSerializer lock(GetDefaultEventEngine()); + grpc_core::WorkSerializer lock; gpr_event done; gpr_event_init(&done); lock.Schedule([&done]() { gpr_event_set(&done, reinterpret_cast(1)); }, @@ -70,7 +63,7 @@ TEST(WorkSerializerTest, ExecuteOneScheduleAndDrain) { class TestThread { public: - explicit TestThread(WorkSerializer* lock) + explicit TestThread(grpc_core::WorkSerializer* lock) : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) { gpr_event_init(&done_); thread_.Start(); @@ -111,14 +104,14 @@ class TestThread { DEBUG_LOCATION); } - WorkSerializer* lock_ = nullptr; - Thread thread_; + grpc_core::WorkSerializer* lock_ = nullptr; + grpc_core::Thread thread_; size_t counter_ = 0; gpr_event done_; }; TEST(WorkSerializerTest, ExecuteMany) { - WorkSerializer lock(GetDefaultEventEngine()); + grpc_core::WorkSerializer lock; { std::vector> threads; for (size_t i = 0; i < 10; ++i) { @@ -129,7 +122,7 @@ TEST(WorkSerializerTest, ExecuteMany) { class TestThreadScheduleAndDrain { public: - explicit TestThreadScheduleAndDrain(WorkSerializer* lock) + explicit TestThreadScheduleAndDrain(grpc_core::WorkSerializer* lock) : lock_(lock), thread_("grpc_execute_many", ExecuteManyLoop, this) { gpr_event_init(&done_); thread_.Start(); @@ -172,14 +165,14 @@ class TestThreadScheduleAndDrain { DEBUG_LOCATION); } - WorkSerializer* lock_ = nullptr; - Thread thread_; + grpc_core::WorkSerializer* lock_ = nullptr; + grpc_core::Thread thread_; size_t counter_ = 0; gpr_event done_; }; TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) { - WorkSerializer lock(GetDefaultEventEngine()); + grpc_core::WorkSerializer lock; { std::vector> threads; for (size_t i = 0; i < 10; ++i) { @@ -189,7 +182,7 @@ TEST(WorkSerializerTest, ExecuteManyScheduleAndDrain) { } TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) { - WorkSerializer lock(GetDefaultEventEngine()); + grpc_core::WorkSerializer lock; { std::vector> run_threads; std::vector> schedule_threads; @@ -203,17 +196,16 @@ TEST(WorkSerializerTest, ExecuteManyMixedRunScheduleAndDrain) { // Tests that work serializers allow destruction from the last callback TEST(WorkSerializerTest, CallbackDestroysWorkSerializer) { - auto lock = std::make_shared(GetDefaultEventEngine()); + auto lock = std::make_shared(); lock->Run([&]() { lock.reset(); }, DEBUG_LOCATION); - WaitForSingleOwner(GetDefaultEventEngine()); } // Tests additional racy conditions when the last callback triggers work // serializer destruction. TEST(WorkSerializerTest, WorkSerializerDestructionRace) { for (int i = 0; i < 1000; ++i) { - auto lock = std::make_shared(GetDefaultEventEngine()); - Notification notification; + auto lock = std::make_shared(); + grpc_core::Notification notification; std::thread t1([&]() { notification.WaitForNotification(); lock.reset(); @@ -226,7 +218,7 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRace) { // Tests racy conditions when the last callback triggers work // serializer destruction. TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) { - auto lock = std::make_shared(GetDefaultEventEngine()); + auto lock = std::make_shared(); absl::Barrier barrier(11); std::vector threads; threads.reserve(10); @@ -245,53 +237,42 @@ TEST(WorkSerializerTest, WorkSerializerDestructionRaceMultipleThreads) { #ifndef NDEBUG TEST(WorkSerializerTest, RunningInWorkSerializer) { - auto work_serializer1 = - std::make_shared(GetDefaultEventEngine()); - auto work_serializer2 = - std::make_shared(GetDefaultEventEngine()); - EXPECT_FALSE(work_serializer1->RunningInWorkSerializer()); - EXPECT_FALSE(work_serializer2->RunningInWorkSerializer()); - work_serializer1->Run( - [=]() { - EXPECT_TRUE(work_serializer1->RunningInWorkSerializer()); - EXPECT_FALSE(work_serializer2->RunningInWorkSerializer()); - work_serializer2->Run( - [=]() { - EXPECT_EQ(work_serializer1->RunningInWorkSerializer(), - !IsWorkSerializerDispatchEnabled()); - EXPECT_TRUE(work_serializer2->RunningInWorkSerializer()); + grpc_core::WorkSerializer work_serializer1; + grpc_core::WorkSerializer work_serializer2; + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); + work_serializer1.Run( + [&]() { + EXPECT_TRUE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); + work_serializer2.Run( + [&]() { + EXPECT_TRUE(work_serializer1.RunningInWorkSerializer()); + EXPECT_TRUE(work_serializer2.RunningInWorkSerializer()); }, DEBUG_LOCATION); }, DEBUG_LOCATION); - EXPECT_FALSE(work_serializer1->RunningInWorkSerializer()); - EXPECT_FALSE(work_serializer2->RunningInWorkSerializer()); - work_serializer2->Run( - [=]() { - EXPECT_FALSE(work_serializer1->RunningInWorkSerializer()); - EXPECT_TRUE(work_serializer2->RunningInWorkSerializer()); - work_serializer1->Run( - [=]() { - EXPECT_TRUE(work_serializer1->RunningInWorkSerializer()); - EXPECT_EQ(work_serializer2->RunningInWorkSerializer(), - !IsWorkSerializerDispatchEnabled()); + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); + work_serializer2.Run( + [&]() { + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_TRUE(work_serializer2.RunningInWorkSerializer()); + work_serializer1.Run( + [&]() { + EXPECT_TRUE(work_serializer1.RunningInWorkSerializer()); + EXPECT_TRUE(work_serializer2.RunningInWorkSerializer()); }, DEBUG_LOCATION); }, DEBUG_LOCATION); - EXPECT_FALSE(work_serializer1->RunningInWorkSerializer()); - EXPECT_FALSE(work_serializer2->RunningInWorkSerializer()); - Notification done1; - Notification done2; - work_serializer1->Run([&done1]() { done1.Notify(); }, DEBUG_LOCATION); - work_serializer2->Run([&done2]() { done2.Notify(); }, DEBUG_LOCATION); - done1.WaitForNotification(); - done2.WaitForNotification(); + EXPECT_FALSE(work_serializer1.RunningInWorkSerializer()); + EXPECT_FALSE(work_serializer2.RunningInWorkSerializer()); } #endif } // namespace -} // namespace grpc_core int main(int argc, char** argv) { grpc::testing::TestEnvironment env(&argc, argv); diff --git a/test/core/iomgr/stranded_event_test.cc b/test/core/iomgr/stranded_event_test.cc index 0991c98b7fe..3260c3d57d4 100644 --- a/test/core/iomgr/stranded_event_test.cc +++ b/test/core/iomgr/stranded_event_test.cc @@ -358,10 +358,9 @@ TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) { grpc_core::MakeRefCounted(); { grpc_core::ExecCtx exec_ctx; - fake_resolver_response_generator->SetResponseSynchronously( - BuildResolverResponse( - {absl::StrCat("ipv4:", kSharedUnconnectableAddress), - absl::StrCat("ipv4:", test_server->address())})); + fake_resolver_response_generator->SetResponse(BuildResolverResponse( + {absl::StrCat("ipv4:", kSharedUnconnectableAddress), + absl::StrCat("ipv4:", test_server->address())})); } args.push_back(grpc_core::FakeResolverResponseGenerator::MakeChannelArg( fake_resolver_response_generator.get())); diff --git a/test/core/transport/chttp2/too_many_pings_test.cc b/test/core/transport/chttp2/too_many_pings_test.cc index 192472c9b28..d888f9a1aab 100644 --- a/test/core/transport/chttp2/too_many_pings_test.cc +++ b/test/core/transport/chttp2/too_many_pings_test.cc @@ -492,9 +492,8 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) { for (int i = 0; i < 3; i++) { gpr_log(GPR_INFO, "Expected keepalive time : %d", expected_keepalive_time_sec); - response_generator->SetResponseSynchronously( - BuildResolverResult({absl::StrCat( - "ipv4:", i % 2 == 0 ? server_address1 : server_address2)})); + response_generator->SetResponse(BuildResolverResult({absl::StrCat( + "ipv4:", i % 2 == 0 ? server_address1 : server_address2)})); // ExecCtx::Flush() might not be enough to make sure that the resolver // result has been propagated, so sleep for a bit. grpc_core::ExecCtx::Get()->Flush(); @@ -507,7 +506,7 @@ TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) { GPR_INFO, "Client keepalive time %d should now be in sync with the server settings", expected_keepalive_time_sec); - response_generator->SetResponseSynchronously( + response_generator->SetResponse( BuildResolverResult({absl::StrCat("ipv4:", server_address2)})); grpc_core::ExecCtx::Get()->Flush(); gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); @@ -555,7 +554,7 @@ TEST_F(KeepaliveThrottlingTest, grpc_channel* channel = grpc_channel_create("fake:///", creds, &client_channel_args); grpc_channel_credentials_release(creds); - response_generator->SetResponseSynchronously( + response_generator->SetResponse( BuildResolverResult({absl::StrCat("ipv4:", server_address1), absl::StrCat("ipv4:", server_address2)})); // For a single subchannel 3 GOAWAYs would be sufficient to increase the diff --git a/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc b/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc index 7d1c1aeaa50..6382c97c99a 100644 --- a/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc +++ b/test/cpp/client/destroy_grpclb_channel_with_active_connect_stress_test.cc @@ -76,7 +76,7 @@ void TryConnectAndDestroy() { << lb_address_result.service_config.status(); lb_address_result.args = grpc_core::SetGrpcLbBalancerAddresses( grpc_core::ChannelArgs(), addresses); - response_generator->SetResponseAsync(lb_address_result); + response_generator->SetResponse(lb_address_result); grpc::ChannelArguments args; args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator.get()); diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index fa3f3f19544..c6b0de46f89 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -596,7 +596,6 @@ grpc_cc_test( "gtest", ], flaky = True, # TODO(b/150567713) - shard_count = 20, tags = [ "cpp_end2end_test", "cpp_lb_end2end_test", diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 02dad20fe0e..1674757118b 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -13,7 +13,6 @@ // limitations under the License. #include -#include #include #include #include @@ -220,13 +219,13 @@ class FakeResolverResponseGeneratorWrapper { const grpc_core::ChannelArgs& per_address_args = grpc_core::ChannelArgs()) { grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponseSynchronously(BuildFakeResults( + response_generator_->SetResponse(BuildFakeResults( ipv6_only_, ports, service_config_json, per_address_args)); } void SetNextResolutionUponError(const std::vector& ports) { grpc_core::ExecCtx exec_ctx; - response_generator_->SetReresolutionResponseSynchronously( + response_generator_->SetReresolutionResponse( BuildFakeResults(ipv6_only_, ports)); } @@ -237,7 +236,7 @@ class FakeResolverResponseGeneratorWrapper { void SetResponse(grpc_core::Resolver::Result result) { grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponseSynchronously(std::move(result)); + response_generator_->SetResponse(std::move(result)); } grpc_core::FakeResolverResponseGenerator* Get() const { @@ -636,14 +635,10 @@ TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) { // Note that this call also returns IDLE, since the state change has // not yet occurred; it just gets triggered by this call. EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE); - // Now that the channel is trying to connect, we should get to state + // Now that the channel is trying to connect, we should be in state // CONNECTING. - ASSERT_TRUE( - WaitForChannelState(channel.get(), [&](grpc_connectivity_state state) { - if (state == GRPC_CHANNEL_IDLE) return false; - EXPECT_EQ(state, GRPC_CHANNEL_CONNECTING); - return true; - })); + EXPECT_EQ(channel->GetState(false /* try_to_connect */), + GRPC_CHANNEL_CONNECTING); // Return a resolver result, which allows the connection attempt to proceed. response_generator.SetNextResolution(GetServersPorts()); // We should eventually transition into state READY. @@ -1305,7 +1300,6 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) { grpc_core::Resolver::Result result; result.addresses.emplace(); result.result_health_callback = [&](absl::Status status) { - gpr_log(GPR_INFO, "****** RESULT HEALTH CALLBACK *******"); EXPECT_EQ(absl::StatusCode::kUnavailable, status.code()); EXPECT_EQ("address list must not be empty", status.message()) << status; notification.Notify(); @@ -1318,8 +1312,8 @@ TEST_F(PickFirstTest, FailsEmptyResolverUpdate) { }; EXPECT_TRUE( WaitForChannelState(channel.get(), predicate, /*try_to_connect=*/true)); - // Callback should run. - notification.WaitForNotification(); + // Callback should have been run. + ASSERT_TRUE(notification.HasBeenNotified()); // Return a valid address. gpr_log(GPR_INFO, "****** SENDING NEXT RESOLVER RESULT *******"); StartServers(1); @@ -1614,7 +1608,6 @@ TEST_F(RoundRobinTest, Updates) { ports.clear(); ports.emplace_back(servers_[1]->port_); response_generator.SetNextResolution(ports); - WaitForChannelReady(channel.get()); WaitForServer(DEBUG_LOCATION, stub, 1); EXPECT_EQ(GRPC_CHANNEL_READY, channel->GetState(/*try_to_connect=*/false)); // Check LB policy name for the channel. diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index ccb2899d6c9..de0c1e78428 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -605,7 +605,7 @@ class GrpclbEnd2endTest : public ::testing::Test { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result = MakeResolverResult( balancer_address_data, backend_address_data, service_config_json); - response_generator_->SetResponseSynchronously(std::move(result)); + response_generator_->SetResponse(std::move(result)); } void SetNextReresolutionResponse( @@ -613,11 +613,9 @@ class GrpclbEnd2endTest : public ::testing::Test { const std::vector& backend_address_data = {}, const char* service_config_json = kDefaultServiceConfig) { grpc_core::ExecCtx exec_ctx; - response_generator_->WaitForResolverSet(); grpc_core::Resolver::Result result = MakeResolverResult( balancer_address_data, backend_address_data, service_config_json); - response_generator_->SetReresolutionResponseSynchronously( - std::move(result)); + response_generator_->SetReresolutionResponse(std::move(result)); } std::vector GetBackendPorts(size_t start_index = 0, diff --git a/test/cpp/end2end/rls_end2end_test.cc b/test/cpp/end2end/rls_end2end_test.cc index b43128a3ea9..c2184522a90 100644 --- a/test/cpp/end2end/rls_end2end_test.cc +++ b/test/cpp/end2end/rls_end2end_test.cc @@ -135,8 +135,7 @@ class FakeResolverResponseGeneratorWrapper { void SetNextResolution(absl::string_view service_config_json) { grpc_core::ExecCtx exec_ctx; - response_generator_->SetResponseSynchronously( - BuildFakeResults(service_config_json)); + response_generator_->SetResponse(BuildFakeResults(service_config_json)); } grpc_core::FakeResolverResponseGenerator* Get() const { diff --git a/test/cpp/end2end/service_config_end2end_test.cc b/test/cpp/end2end/service_config_end2end_test.cc index 6420080c246..16604719cbf 100644 --- a/test/cpp/end2end/service_config_end2end_test.cc +++ b/test/cpp/end2end/service_config_end2end_test.cc @@ -192,7 +192,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test { void SetNextResolutionNoServiceConfig(const std::vector& ports) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result = BuildFakeResults(ports); - response_generator_->SetResponseSynchronously(result); + response_generator_->SetResponse(result); } void SetNextResolutionValidServiceConfig(const std::vector& ports) { @@ -201,7 +201,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test { result.service_config = grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), "{}"); ASSERT_TRUE(result.service_config.ok()) << result.service_config.status(); - response_generator_->SetResponseSynchronously(result); + response_generator_->SetResponse(result); } void SetNextResolutionInvalidServiceConfig(const std::vector& ports) { @@ -209,7 +209,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test { grpc_core::Resolver::Result result = BuildFakeResults(ports); result.service_config = absl::InvalidArgumentError("error parsing service config"); - response_generator_->SetResponseSynchronously(result); + response_generator_->SetResponse(result); } void SetNextResolutionWithServiceConfig(const std::vector& ports, @@ -218,7 +218,7 @@ class ServiceConfigEnd2endTest : public ::testing::Test { grpc_core::Resolver::Result result = BuildFakeResults(ports); result.service_config = grpc_core::ServiceConfigImpl::Create(grpc_core::ChannelArgs(), svc_cfg); - response_generator_->SetResponseSynchronously(result); + response_generator_->SetResponse(result); } std::vector GetServersPorts(size_t start_index = 0) { diff --git a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc index 259b37fd476..f27ae5532cf 100644 --- a/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_type_end2end_test.cc @@ -101,7 +101,7 @@ TEST_P(LogicalDNSClusterTest, Basic) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(GetBackendPorts()); - logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously( + logical_dns_cluster_resolver_response_generator_->SetResponse( std::move(result)); } // RPCs should succeed. @@ -287,7 +287,7 @@ TEST_P(AggregateClusterTest, EdsToLogicalDns) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(GetBackendPorts(1, 2)); - logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously( + logical_dns_cluster_resolver_response_generator_->SetResponse( std::move(result)); } // Wait for traffic to go to backend 0. @@ -348,7 +348,7 @@ TEST_P(AggregateClusterTest, LogicalDnsToEds) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(GetBackendPorts(0, 1)); - logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously( + logical_dns_cluster_resolver_response_generator_->SetResponse( std::move(result)); } // Wait for traffic to go to backend 0. @@ -419,7 +419,7 @@ TEST_P(AggregateClusterTest, ReconfigEdsWhileLogicalDnsChildFails) { grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = absl::UnavailableError("injected error"); - logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously( + logical_dns_cluster_resolver_response_generator_->SetResponse( std::move(result)); } // When an RPC fails, we know the channel has seen the update. diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index d6de6167e8b..ef06120bc5b 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -210,7 +210,7 @@ TEST_P(RingHashTest, grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(GetBackendPorts()); - logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously( + logical_dns_cluster_resolver_response_generator_->SetResponse( std::move(result)); } // Inject connection delay to make this act more realistically. @@ -278,7 +278,7 @@ TEST_P(RingHashTest, grpc_core::ExecCtx exec_ctx; grpc_core::Resolver::Result result; result.addresses = CreateAddressListFromPortList(GetBackendPorts()); - logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously( + logical_dns_cluster_resolver_response_generator_->SetResponse( std::move(result)); } // Set up connection attempt injector. diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index 50f824251be..d56a19f4b7c 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -107,8 +107,7 @@ void ArgsInit(ArgsStruct* args) { grpc_pollset_init(args->pollset, &args->mu); args->pollset_set = grpc_pollset_set_create(); grpc_pollset_set_add_pollset(args->pollset_set, args->pollset); - args->lock = std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); + args->lock = std::make_shared(); gpr_atm_rel_store(&args->done_atm, 0); args->channel_args = nullptr; } diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 10dc8b0b60b..69a19c9d90f 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -201,8 +201,7 @@ void ArgsInit(ArgsStruct* args) { grpc_pollset_init(args->pollset, &args->mu); args->pollset_set = grpc_pollset_set_create(); grpc_pollset_set_add_pollset(args->pollset_set, args->pollset); - args->lock = std::make_shared( - grpc_event_engine::experimental::GetDefaultEventEngine()); + args->lock = std::make_shared(); args->done = false; args->channel_args = nullptr; }