From b04aa1cf926e897f9180d8802c2c03782dcffc09 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Nov 2022 12:14:22 -0800 Subject: [PATCH] [event_engine] Move combiner executor usage to event engine (#31713) * [event_engine] Move combiner executor usage to event engine * fix * review feedback * fix * x * fix * fix --- BUILD | 4 ++- bazel/grpc_deps.bzl | 5 ++++ src/core/BUILD | 4 +-- .../filters/client_channel/config_selector.h | 1 - .../lb_policy/priority/priority.cc | 1 - .../weighted_target/weighted_target.cc | 1 - .../lb_policy/xds/xds_cluster_manager.cc | 1 - .../message_compress/compression_filter.cc | 4 +-- .../binder/transport/binder_transport.cc | 4 ++- .../binder/wire_format/wire_writer.cc | 6 +++- src/core/ext/transport/chaotic_good/frame.cc | 1 + .../chttp2/transport/chttp2_transport.cc | 5 +++- .../chttp2/transport/flow_control.cc | 1 + src/core/ext/xds/xds_route_config.cc | 1 + src/core/lib/iomgr/combiner.cc | 26 ++++++++--------- src/core/lib/iomgr/combiner.h | 5 ++-- src/cpp/ext/gcp/BUILD | 6 ++++ src/cpp/ext/gcp/observability_config.h | 1 + src/cpp/ext/gcp/observability_logging_sink.cc | 13 +++++++++ src/cpp/ext/gcp/observability_logging_sink.h | 2 ++ .../lb_policy/lb_policy_test_lib.h | 2 ++ .../lb_policy/outlier_detection_test.cc | 7 ----- .../lb_policy/pick_first_test.cc | 11 +------ test/core/end2end/BUILD | 1 + .../end2end/fixtures/http_proxy_fixture.cc | 4 ++- test/core/end2end/fuzzers/BUILD | 1 + test/core/end2end/fuzzers/client_fuzzer.cc | 23 ++++++++++++++- .../posix/traced_buffer_list_test.cc | 4 +++ test/core/iomgr/combiner_test.cc | 19 +++++++++--- .../core/transport/binder/wire_reader_test.cc | 5 +++- .../transport/chttp2/flow_control_test.cc | 1 + test/core/xds/xds_client_fuzzer.cc | 17 +++++++++++ .../xds_route_config_resource_type_test.cc | 1 - test/cpp/microbenchmarks/bm_closure.cc | 29 ++++++++++++++----- tools/distrib/fix_build_deps.py | 4 +++ 35 files changed, 159 insertions(+), 62 deletions(-) diff --git a/BUILD b/BUILD index 9ae7c053be5..4299d972d6a 100644 --- a/BUILD +++ b/BUILD @@ -996,6 +996,7 @@ grpc_cc_library( "//src/core:arena", "//src/core:channel_args_preconditioning", "//src/core:channel_stack_type", + "//src/core:default_event_engine", "//src/core:iomgr_fwd", "//src/core:iomgr_port", "//src/core:slice", @@ -3166,11 +3167,12 @@ grpc_cc_library( "//src/core:channel_init", "//src/core:channel_stack_type", "//src/core:context", - "//src/core:for_each", "//src/core:grpc_message_size_filter", "//src/core:latch", "//src/core:map_pipe", "//src/core:percent_encoding", + "//src/core:pipe", + "//src/core:promise_like", "//src/core:seq", "//src/core:slice", "//src/core:slice_buffer", diff --git a/bazel/grpc_deps.bzl b/bazel/grpc_deps.bzl index bfb319842cd..5937b0d37de 100644 --- a/bazel/grpc_deps.bzl +++ b/bazel/grpc_deps.bzl @@ -205,6 +205,11 @@ def grpc_deps(): actual = "@com_google_googleapis//google/logging/v2:logging_cc_grpc", ) + native.bind( + name = "googleapis_logging_cc_proto", + actual = "@com_google_googleapis//google/logging/v2:logging_cc_proto", + ) + if "boringssl" not in native.existing_rules(): http_archive( name = "boringssl", diff --git a/src/core/BUILD b/src/core/BUILD index f728ea088c1..8be251fc146 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3896,7 +3896,6 @@ grpc_cc_library( "lb_policy_factory", "lb_policy_registry", "pollset_set", - "ref_counted", "subchannel_interface", "time", "validation_errors", @@ -4180,7 +4179,6 @@ grpc_cc_library( "lb_policy_factory", "lb_policy_registry", "pollset_set", - "ref_counted", "subchannel_interface", "time", "validation_errors", @@ -4221,7 +4219,6 @@ grpc_cc_library( "lb_policy_factory", "lb_policy_registry", "pollset_set", - "ref_counted", "subchannel_interface", "time", "validation_errors", @@ -4776,6 +4773,7 @@ grpc_cc_library( "//:gpr", "//:gpr_platform", "//:grpc_base", + "//:grpc_public_hdrs", "//:hpack_encoder", "//:hpack_parser", ], diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h index 5112a5a466f..85e711609ca 100644 --- a/src/core/ext/filters/client_channel/config_selector.h +++ b/src/core/ext/filters/client_channel/config_selector.h @@ -31,7 +31,6 @@ #include #include -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/ref_counted.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index 4cbf45ac089..7aa2ee43fb8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -45,7 +45,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index eb1ce62d792..1484385f1e0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -45,7 +45,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index d80b0441d82..6daee327668 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -44,7 +44,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc index 1deae30bc15..5e56c519f6f 100644 --- a/src/core/ext/filters/http/message_compress/compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/compression_filter.cc @@ -18,7 +18,6 @@ #include -#include #include #include #include @@ -43,9 +42,10 @@ #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/promise/context.h" -#include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/detail/promise_like.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/map_pipe.h" +#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/promise.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/try_concurrently.h" diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc index a1a2665e03a..bdb3cf7a66f 100644 --- a/src/core/ext/transport/binder/transport/binder_transport.cc +++ b/src/core/ext/transport/binder/transport/binder_transport.cc @@ -35,6 +35,7 @@ #include "src/core/ext/transport/binder/wire_format/wire_reader.h" #include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h" #include "src/core/ext/transport/binder/wire_format/wire_writer.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/error_utils.h" @@ -709,7 +710,8 @@ grpc_binder_transport::grpc_binder_transport( std::unique_ptr binder, bool is_client, std::shared_ptr security_policy) : is_client(is_client), - combiner(grpc_combiner_create()), + combiner(grpc_combiner_create( + grpc_event_engine::experimental::GetDefaultEventEngine())), state_tracker( is_client ? "binder_transport_client" : "binder_transport_server", GRPC_CHANNEL_READY), diff --git a/src/core/ext/transport/binder/wire_format/wire_writer.cc b/src/core/ext/transport/binder/wire_format/wire_writer.cc index 6d804e9dab9..1b475e2863e 100644 --- a/src/core/ext/transport/binder/wire_format/wire_writer.cc +++ b/src/core/ext/transport/binder/wire_format/wire_writer.cc @@ -25,6 +25,8 @@ #include +#include "src/core/lib/event_engine/default_event_engine.h" + #define RETURN_IF_ERROR(expr) \ do { \ const absl::Status status = (expr); \ @@ -81,7 +83,9 @@ absl::Status WriteTrailingMetadata(const Transaction& tx, } WireWriterImpl::WireWriterImpl(std::unique_ptr binder) - : binder_(std::move(binder)), combiner_(grpc_combiner_create()) {} + : binder_(std::move(binder)), + combiner_(grpc_combiner_create( + grpc_event_engine::experimental::GetDefaultEventEngine())) {} WireWriterImpl::~WireWriterImpl() { GRPC_COMBINER_UNREF(combiner_, "wire_writer_impl"); diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index ad260793d72..cb473636d39 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -25,6 +25,7 @@ #include "absl/status/statusor.h" #include +#include #include #include "src/core/lib/gprpp/bitset.h" diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 670dc05f101..897b96b9f5e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -39,6 +39,7 @@ #include "absl/types/optional.h" #include "absl/types/variant.h" +#include #include #include #include @@ -475,7 +476,9 @@ grpc_chttp2_transport::grpc_chttp2_transport( grpc_endpoint_get_peer(ep), ":client_transport"))), self_reservation( memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))), - combiner(grpc_combiner_create()), + combiner(grpc_combiner_create( + channel_args + .GetObjectRef())), state_tracker(is_client ? "client_transport" : "server_transport", GRPC_CHANNEL_READY), is_client(is_client), diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 48eae6e67b6..e1c26124bc5 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "absl/strings/str_cat.h" diff --git a/src/core/ext/xds/xds_route_config.cc b/src/core/ext/xds/xds_route_config.cc index c7d35cfc1d4..25478580839 100644 --- a/src/core/ext/xds/xds_route_config.cc +++ b/src/core/ext/xds/xds_route_config.cc @@ -68,6 +68,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/match.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy_registry.h" diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 1ad56dcf998..fa8eb3a83ba 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -28,7 +28,7 @@ #include #include "src/core/lib/gprpp/mpscq.h" -#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); @@ -49,14 +49,14 @@ static void combiner_finally_exec(grpc_core::Combiner* lock, grpc_closure* closure, grpc_error_handle error); -static void offload(void* arg, grpc_error_handle error); - -grpc_core::Combiner* grpc_combiner_create(void) { +grpc_core::Combiner* grpc_combiner_create( + std::shared_ptr + event_engine) { grpc_core::Combiner* lock = new grpc_core::Combiner(); + lock->event_engine = std::move(event_engine); gpr_ref_init(&lock->refs, 1); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, nullptr); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock)); return lock; } @@ -163,15 +163,15 @@ static void move_next() { } } -static void offload(void* arg, grpc_error_handle /*error*/) { - grpc_core::Combiner* lock = static_cast(arg); - push_last_on_exec_ctx(lock); -} - static void queue_offload(grpc_core::Combiner* lock) { move_next(); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); - grpc_core::Executor::Run(&lock->offload, absl::OkStatus()); + lock->event_engine->Run([lock] { + grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; + grpc_core::ExecCtx exec_ctx(0); + push_last_on_exec_ctx(lock); + exec_ctx.Flush(); + }); } bool grpc_combiner_continue_exec_ctx() { @@ -198,9 +198,7 @@ bool grpc_combiner_continue_exec_ctx() { // 2. the current execution context needs to finish as soon as possible // 3. the current thread is not a worker for any background poller // 4. the DEFAULT executor is threaded - if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - !grpc_iomgr_platform_is_any_background_poller_thread() && - grpc_core::Executor::IsThreadedDefault()) { + if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish()) { // this execution context wants to move on: schedule remaining work to be // picked up on the executor queue_offload(lock); diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 3b9ddfcec13..32619db5eb2 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -49,8 +49,8 @@ class Combiner { gpr_atm state; bool time_to_execute_final_list = false; grpc_closure_list final_list; - grpc_closure offload; gpr_refcount refs; + std::shared_ptr event_engine; }; } // namespace grpc_core @@ -61,7 +61,8 @@ class Combiner { // Initialize the lock, with an optional workqueue to shift load to when // necessary -grpc_core::Combiner* grpc_combiner_create(void); +grpc_core::Combiner* grpc_combiner_create( + std::shared_ptr event_engine); #ifndef NDEBUG #define GRPC_COMBINER_DEBUG_ARGS \ diff --git a/src/cpp/ext/gcp/BUILD b/src/cpp/ext/gcp/BUILD index 517862b8b2d..7245b193608 100644 --- a/src/cpp/ext/gcp/BUILD +++ b/src/cpp/ext/gcp/BUILD @@ -101,16 +101,22 @@ grpc_cc_library( "absl/strings", "absl/strings:str_format", "absl/types:optional", + "googleapis_logging_cc_proto", "googleapis_logging_grpc_service", + "protobuf_headers", ], language = "c++", visibility = ["//test:__subpackages__"], deps = [ "observability_config", + "//:gpr", "//:gpr_platform", "//:grpc++", + "//:grpc_base", "//:grpc_opencensus_plugin", "//src/core:env", + "//src/core:json", + "//src/core:time", "//src/cpp/ext/filters/logging:logging_sink", ], ) diff --git a/src/cpp/ext/gcp/observability_config.h b/src/cpp/ext/gcp/observability_config.h index a9ab6f0b4d9..0d5cc69c53f 100644 --- a/src/cpp/ext/gcp/observability_config.h +++ b/src/cpp/ext/gcp/observability_config.h @@ -19,6 +19,7 @@ #include +#include #include #include diff --git a/src/cpp/ext/gcp/observability_logging_sink.cc b/src/cpp/ext/gcp/observability_logging_sink.cc index 16ab40a109e..f7fd7155341 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.cc +++ b/src/cpp/ext/gcp/observability_logging_sink.cc @@ -23,15 +23,28 @@ #include #include +#include #include +#include + +#include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include "absl/types/optional.h" +#include "google/logging/v2/log_entry.pb.h" #include "google/logging/v2/logging.grpc.pb.h" +#include "google/logging/v2/logging.pb.h" +#include +#include #include +#include +#include +#include #include "src/core/lib/gprpp/env.h" +#include "src/core/lib/gprpp/time.h" +#include "src/core/lib/json/json.h" #include "src/cpp/ext/filters/census/open_census_call_tracer.h" namespace grpc { diff --git a/src/cpp/ext/gcp/observability_logging_sink.h b/src/cpp/ext/gcp/observability_logging_sink.h index 36d676f261c..5144c15c62c 100644 --- a/src/cpp/ext/gcp/observability_logging_sink.h +++ b/src/cpp/ext/gcp/observability_logging_sink.h @@ -27,6 +27,8 @@ #include #include +#include + #include "absl/strings/string_view.h" #include "google/logging/v2/logging.grpc.pb.h" diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 5d6c7926a8b..70653c008ad 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,7 @@ #include "absl/strings/string_view.h" #include "absl/synchronization/notification.h" #include "absl/types/optional.h" +#include "absl/types/span.h" #include "absl/types/variant.h" #include "gtest/gtest.h" diff --git a/test/core/client_channel/lb_policy/outlier_detection_test.cc b/test/core/client_channel/lb_policy/outlier_detection_test.cc index 3a368f6c4a8..80870826079 100644 --- a/test/core/client_channel/lb_policy/outlier_detection_test.cc +++ b/test/core/client_channel/lb_policy/outlier_detection_test.cc @@ -19,27 +19,20 @@ #include #include -#include #include #include -#include #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "gtest/gtest.h" #include -#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" -#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" -#include "src/core/lib/resolver/server_address.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" #include "test/core/util/test_config.h" diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index 4578648726c..fd9ace9de89 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -16,25 +16,16 @@ #include -#include -#include -#include -#include -#include - #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "gtest/gtest.h" #include -#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/iomgr/resolved_address.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/load_balancing/lb_policy.h" -#include "src/core/lib/resolver/server_address.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" #include "test/core/util/test_config.h" diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD index f1d0d72a14c..cf546811112 100644 --- a/test/core/end2end/BUILD +++ b/test/core/end2end/BUILD @@ -71,6 +71,7 @@ grpc_cc_library( "//:sockaddr_utils", "//src/core:channel_args_preconditioning", "//src/core:closure", + "//src/core:default_event_engine", "//src/core:error", "//src/core:iomgr_fwd", "//src/core:pollset_set", diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index dabc7c2185d..63538e9803d 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -41,6 +41,7 @@ #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/status_helper.h" @@ -67,7 +68,8 @@ struct grpc_end2end_http_proxy { grpc_end2end_http_proxy() : server(nullptr), channel_args(nullptr), mu(nullptr), combiner(nullptr) { gpr_ref_init(&users, 1); - combiner = grpc_combiner_create(); + combiner = grpc_combiner_create( + grpc_event_engine::experimental::GetDefaultEventEngine()); } std::string proxy_name; grpc_core::Thread thd; diff --git a/test/core/end2end/fuzzers/BUILD b/test/core/end2end/fuzzers/BUILD index 87786fcdc73..e6a948f665f 100644 --- a/test/core/end2end/fuzzers/BUILD +++ b/test/core/end2end/fuzzers/BUILD @@ -58,6 +58,7 @@ grpc_fuzzer( deps = [ "//:gpr", "//:grpc", + "//test/core/event_engine/fuzzing_event_engine", "//test/core/util:grpc_test_util", "//test/core/util:grpc_test_util_base", ], diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index c746efb0fe1..be4121ef56f 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -20,11 +20,13 @@ #include #include +#include #include #include "absl/status/statusor.h" #include +#include #include #include #include @@ -35,6 +37,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -44,11 +47,16 @@ #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/surface/event_string.h" #include "src/core/lib/transport/transport_fwd.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" #include "test/core/util/mock_endpoint.h" bool squelch = true; bool leak_check = true; +using ::grpc_event_engine::experimental::FuzzingEventEngine; +using ::grpc_event_engine::experimental::GetDefaultEventEngine; + static void discard_write(grpc_slice /*slice*/) {} static void* tag(intptr_t t) { return reinterpret_cast(t); } @@ -57,6 +65,13 @@ static void dont_log(gpr_log_func_args* /*args*/) {} extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { if (squelch) gpr_set_log_function(dont_log); + grpc_event_engine::experimental::SetEventEngineFactory([]() { + return std::make_unique( + FuzzingEventEngine::Options(), fuzzing_event_engine::Actions{}); + }); + auto engine = + std::dynamic_pointer_cast(GetDefaultEventEngine()); + FuzzingEventEngine::SetGlobalNowImplEngine(engine.get()); grpc_init(); { grpc_core::ExecCtx exec_ctx; @@ -94,6 +109,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_status_code status; grpc_slice details = grpc_empty_slice(); + engine->Tick(); + grpc_op ops[6]; memset(ops, 0, sizeof(ops)); grpc_op* op = ops; @@ -134,6 +151,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_event ev; while (true) { + engine->Tick(); grpc_core::ExecCtx::Get()->Flush(); ev = grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); @@ -149,6 +167,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { } done: + engine->FuzzingDone(); + engine->Tick(); if (requested_calls) { grpc_call_cancel(call, nullptr); } @@ -182,6 +202,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_byte_buffer_destroy(response_payload_recv); } } - grpc_shutdown(); + grpc_shutdown_blocking(); + FuzzingEventEngine::UnsetGlobalNowImplEngine(engine.get()); return 0; } diff --git a/test/core/event_engine/posix/traced_buffer_list_test.cc b/test/core/event_engine/posix/traced_buffer_list_test.cc index fbd635b2630..7bb62475bd2 100644 --- a/test/core/event_engine/posix/traced_buffer_list_test.cc +++ b/test/core/event_engine/posix/traced_buffer_list_test.cc @@ -19,7 +19,11 @@ #include "gtest/gtest.h" #include +#include +#include +#include +#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index 855592dbe23..ac53e3323d6 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -24,6 +24,7 @@ #include #include +#include "src/core/lib/event_engine/default_event_engine_factory.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/thd.h" #include "test/core/util/test_config.h" @@ -31,7 +32,11 @@ TEST(CombinerTest, TestNoOp) { gpr_log(GPR_DEBUG, "test_no_op"); grpc_core::ExecCtx exec_ctx; - GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op"); + GRPC_COMBINER_UNREF( + grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())), + "test_no_op"); } static void set_event_to_true(void* value, grpc_error_handle /*error*/) { @@ -41,7 +46,9 @@ static void set_event_to_true(void* value, grpc_error_handle /*error*/) { TEST(CombinerTest, TestExecuteOne) { gpr_log(GPR_DEBUG, "test_execute_one"); - grpc_core::Combiner* lock = grpc_combiner_create(); + grpc_core::Combiner* lock = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); gpr_event done; gpr_event_init(&done); grpc_core::ExecCtx exec_ctx; @@ -95,7 +102,9 @@ static void execute_many_loop(void* a) { TEST(CombinerTest, TestExecuteMany) { gpr_log(GPR_DEBUG, "test_execute_many"); - grpc_core::Combiner* lock = grpc_combiner_create(); + grpc_core::Combiner* lock = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); grpc_core::Thread thds[10]; thd_args ta[GPR_ARRAY_SIZE(thds)]; for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { @@ -128,7 +137,9 @@ static void add_finally(void* arg, grpc_error_handle /*error*/) { TEST(CombinerTest, TestExecuteFinally) { gpr_log(GPR_DEBUG, "test_execute_finally"); - grpc_core::Combiner* lock = grpc_combiner_create(); + grpc_core::Combiner* lock = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); grpc_core::ExecCtx exec_ctx; gpr_event_init(&got_in_finally); lock->Run(GRPC_CLOSURE_CREATE(add_finally, lock, nullptr), absl::OkStatus()); diff --git a/test/core/transport/binder/wire_reader_test.cc b/test/core/transport/binder/wire_reader_test.cc index 6225e071f57..b7d04f36916 100644 --- a/test/core/transport/binder/wire_reader_test.cc +++ b/test/core/transport/binder/wire_reader_test.cc @@ -370,5 +370,8 @@ TEST_F(WireReaderTest, ServerInitialMetadata) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); - return RUN_ALL_TESTS(); + grpc_init(); + int r = RUN_ALL_TESTS(); + grpc_shutdown(); + return r; } diff --git a/test/core/transport/chttp2/flow_control_test.cc b/test/core/transport/chttp2/flow_control_test.cc index 985ee002482..8df63929fea 100644 --- a/test/core/transport/chttp2/flow_control_test.cc +++ b/test/core/transport/chttp2/flow_control_test.cc @@ -15,6 +15,7 @@ #include "src/core/ext/transport/chttp2/transport/flow_control.h" #include +#include #include "gtest/gtest.h" diff --git a/test/core/xds/xds_client_fuzzer.cc b/test/core/xds/xds_client_fuzzer.cc index 839880b88dd..b902ad0c178 100644 --- a/test/core/xds/xds_client_fuzzer.cc +++ b/test/core/xds/xds_client_fuzzer.cc @@ -14,16 +14,33 @@ // limitations under the License. // +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "absl/types/optional.h" + +#include #include +#include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap_grpc.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_cluster.h" #include "src/core/ext/xds/xds_endpoint.h" #include "src/core/ext/xds/xds_listener.h" #include "src/core/ext/xds/xds_route_config.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/libfuzzer/libfuzzer_macro.h" +#include "src/proto/grpc/testing/xds/v3/discovery.pb.h" #include "test/core/xds/xds_client_fuzzer.pb.h" #include "test/core/xds/xds_transport_fake.h" diff --git a/test/core/xds/xds_route_config_resource_type_test.cc b/test/core/xds/xds_route_config_resource_type_test.cc index 45e30373d2c..26385037ee3 100644 --- a/test/core/xds/xds_route_config_resource_type_test.cc +++ b/test/core/xds/xds_route_config_resource_type_test.cc @@ -44,7 +44,6 @@ #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap_grpc.h" #include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_http_filters.h" #include "src/core/ext/xds/xds_resource_type.h" #include "src/core/ext/xds/xds_route_config.h" #include "src/core/lib/channel/status_util.h" diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index b322ea58cf9..6cd70e05ee2 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -24,6 +24,7 @@ #include +#include "src/core/lib/event_engine/default_event_engine_factory.h" #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" @@ -59,7 +60,9 @@ static void BM_ClosureInitAgainstExecCtx(benchmark::State& state) { BENCHMARK(BM_ClosureInitAgainstExecCtx); static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { - grpc_core::Combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); grpc_closure c; grpc_core::ExecCtx exec_ctx; for (auto _ : state) { @@ -204,7 +207,9 @@ static void BM_TryAcquireSpinlock(benchmark::State& state) { BENCHMARK(BM_TryAcquireSpinlock); static void BM_ClosureSchedOnCombiner(benchmark::State& state) { - grpc_core::Combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); grpc_closure c; GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, nullptr); grpc_core::ExecCtx exec_ctx; @@ -217,7 +222,9 @@ static void BM_ClosureSchedOnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureSchedOnCombiner); static void BM_ClosureSched2OnCombiner(benchmark::State& state) { - grpc_core::Combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); grpc_closure c1; grpc_closure c2; GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr); @@ -233,7 +240,9 @@ static void BM_ClosureSched2OnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureSched2OnCombiner); static void BM_ClosureSched3OnCombiner(benchmark::State& state) { - grpc_core::Combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create( + std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine())); grpc_closure c1; grpc_closure c2; grpc_closure c3; @@ -252,8 +261,10 @@ static void BM_ClosureSched3OnCombiner(benchmark::State& state) { BENCHMARK(BM_ClosureSched3OnCombiner); static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { - grpc_core::Combiner* combiner1 = grpc_combiner_create(); - grpc_core::Combiner* combiner2 = grpc_combiner_create(); + auto ee = std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine()); + grpc_core::Combiner* combiner1 = grpc_combiner_create(ee); + grpc_core::Combiner* combiner2 = grpc_combiner_create(ee); grpc_closure c1; grpc_closure c2; GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr); @@ -270,8 +281,10 @@ static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { BENCHMARK(BM_ClosureSched2OnTwoCombiners); static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) { - grpc_core::Combiner* combiner1 = grpc_combiner_create(); - grpc_core::Combiner* combiner2 = grpc_combiner_create(); + auto ee = std::shared_ptr( + grpc_event_engine::experimental::CreateEventEngine()); + grpc_core::Combiner* combiner1 = grpc_combiner_create(ee); + grpc_core::Combiner* combiner2 = grpc_combiner_create(ee); grpc_closure c1; grpc_closure c2; grpc_closure c3; diff --git a/tools/distrib/fix_build_deps.py b/tools/distrib/fix_build_deps.py index 0f6b51b4ae9..6abafe0ec4f 100755 --- a/tools/distrib/fix_build_deps.py +++ b/tools/distrib/fix_build_deps.py @@ -141,8 +141,12 @@ EXTERNAL_DEPS = { 'googleapis_trace_grpc_service', 'google/logging/v2/logging.grpc.pb.h': 'googleapis_logging_grpc_service', + 'google/logging/v2/logging.pb.h': + 'googleapis_logging_cc_proto', 'google/monitoring/v3/metric_service.grpc.pb.h': 'googleapis_monitoring_grpc_service', + 'google/logging/v2/log_entry.pb.h': + 'googleapis_logging_cc_proto', 'gmock/gmock.h': 'gtest', 'gtest/gtest.h':