From 3d82c522fae1e57255506d5d610a08ed90944ca8 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 27 Mar 2024 19:50:46 +0000 Subject: [PATCH] [EventEngine] Refactor ServerCallbackCall to use EventEngine::Run (#36126) This removes two Executor::Run dependencies, and requires that all ServerCallbackCall implementations implement the new `RunAsync` method. There's one other known other implementation of ServerCallbackCall that will need to be updated. We could also support an "inefficient" path that uses the default engine (not implemented here), for all subclasses that do not want to update. As far as anyone is aware, the ServerCallbackCall class was never intended to be subclassed externally. Closes #36126 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36126 from drfloob:server-callback-on-ee 6242a78a3f6d5cd290e02e9d3ab83e1b00152524 PiperOrigin-RevId: 619621598 --- BUILD | 6 +- bazel/grpc_build_system.bzl | 6 +- .../grpcpp/impl/server_callback_handlers.h | 9 +++ include/grpcpp/server_context.h | 9 ++- include/grpcpp/support/server_callback.h | 9 +++ src/cpp/client/client_callback.cc | 8 --- src/cpp/server/server_callback.cc | 59 +++---------------- src/objective-c/BUILD | 2 +- src/objective-c/examples/BUILD | 6 +- .../grpc_objc_internal_library.bzl | 10 ++-- src/objective-c/tests/BUILD | 6 +- src/proto/grpc/testing/BUILD | 2 +- src/proto/grpc/testing/xds/v3/BUILD | 2 +- .../grpcio_tests/tests/stress/BUILD.bazel | 3 +- test/core/gprpp/BUILD | 2 +- test/core/iomgr/BUILD | 2 +- test/core/transport/chttp2/BUILD | 2 +- test/cpp/qps/BUILD | 2 +- test/cpp/qps/qps_benchmark_script.bzl | 2 +- third_party/address_sorting/BUILD | 4 +- 20 files changed, 64 insertions(+), 87 deletions(-) diff --git a/BUILD b/BUILD index fb01875cbd4..33915ce87db 100644 --- a/BUILD +++ b/BUILD @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("@bazel_skylib//lib:selects.bzl", "selects") +load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") load( "//bazel:grpc_build_system.bzl", "grpc_cc_library", @@ -22,8 +24,6 @@ load( "grpc_upb_proto_reflection_library", "python_config_settings", ) -load("@bazel_skylib//lib:selects.bzl", "selects") -load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") licenses(["reciprocal"]) @@ -2385,6 +2385,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/functional:any_invocable", "absl/status", "absl/status:statusor", "absl/strings", @@ -2465,6 +2466,7 @@ grpc_cc_library( hdrs = GRPCXX_HDRS, external_deps = [ "absl/base:core_headers", + "absl/functional:any_invocable", "absl/status", "absl/status:statusor", "absl/strings", diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index bf241fb8685..008bb784223 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -27,13 +27,13 @@ Contains macros used throughout the repo. """ +load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test") +load("@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", "ios_test_runner") +load("@com_google_protobuf//bazel:upb_proto_library.bzl", "upb_proto_library", "upb_proto_reflection_library") load("//bazel:cc_grpc_library.bzl", "cc_grpc_library") load("//bazel:copts.bzl", "GRPC_DEFAULT_COPTS") load("//bazel:experiments.bzl", "EXPERIMENTS", "EXPERIMENT_ENABLES", "EXPERIMENT_POLLERS") load("//bazel:test_experiments.bzl", "TEST_EXPERIMENTS", "TEST_EXPERIMENT_ENABLES", "TEST_EXPERIMENT_POLLERS") -load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test") -load("@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", "ios_test_runner") -load("@com_google_protobuf//bazel:upb_proto_library.bzl", "upb_proto_library", "upb_proto_reflection_library") # The set of pollers to test against if a test exercises polling POLLERS = ["epoll1", "poll"] diff --git a/include/grpcpp/impl/server_callback_handlers.h b/include/grpcpp/impl/server_callback_handlers.h index f8aa1d374a9..b49e17a2c16 100644 --- a/include/grpcpp/impl/server_callback_handlers.h +++ b/include/grpcpp/impl/server_callback_handlers.h @@ -19,6 +19,7 @@ #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H #include +#include #include #include #include @@ -186,6 +187,8 @@ class CallbackUnaryHandler : public grpc::internal::MethodHandler { ctx_->set_message_allocator_state(allocator_state); } + grpc_call* call() override { return call_.call(); } + /// SetupReactor binds the reactor (which also releases any queued /// operations), maybe calls OnCancel if possible/needed, and maybe marks /// the completion of the RPC. This should be the last component of the @@ -370,6 +373,8 @@ class CallbackClientStreamingHandler : public grpc::internal::MethodHandler { std::function call_requester) : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} + grpc_call* call() override { return call_.call(); } + void SetupReactor(ServerReadReactor* reactor) { reactor_.store(reactor, std::memory_order_relaxed); // The callback for this function should not be inlined because it invokes @@ -595,6 +600,8 @@ class CallbackServerStreamingHandler : public grpc::internal::MethodHandler { req_(req), call_requester_(std::move(call_requester)) {} + grpc_call* call() override { return call_.call(); } + void SetupReactor(ServerWriteReactor* reactor) { reactor_.store(reactor, std::memory_order_relaxed); // The callback for this function should not be inlined because it invokes @@ -807,6 +814,8 @@ class CallbackBidiHandler : public grpc::internal::MethodHandler { std::function call_requester) : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} + grpc_call* call() override { return call_.call(); } + void SetupReactor(ServerBidiReactor* reactor) { reactor_.store(reactor, std::memory_order_relaxed); // The callbacks for these functions should not be inlined because they diff --git a/include/grpcpp/server_context.h b/include/grpcpp/server_context.h index 243d1492054..d860be16095 100644 --- a/include/grpcpp/server_context.h +++ b/include/grpcpp/server_context.h @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -520,7 +521,9 @@ class ServerContextBase { public: TestServerCallbackUnary(ServerContextBase* ctx, std::function func) - : reactor_(ctx->DefaultReactor()), func_(std::move(func)) { + : reactor_(ctx->DefaultReactor()), + func_(std::move(func)), + call_(ctx->c_call()) { this->BindReactor(reactor_); } void Finish(grpc::Status s) override { @@ -537,12 +540,16 @@ class ServerContextBase { private: void CallOnDone() override {} + + grpc_call* call() override { return call_; } + grpc::internal::ServerReactor* reactor() override { return reactor_; } grpc::ServerUnaryReactor* const reactor_; std::atomic_bool status_set_{false}; grpc::Status status_; const std::function func_; + grpc_call* call_; }; alignas(Reactor) char default_reactor_[sizeof(Reactor)]; diff --git a/include/grpcpp/support/server_callback.h b/include/grpcpp/support/server_callback.h index 47fb03d5152..e5fd910701a 100644 --- a/include/grpcpp/support/server_callback.h +++ b/include/grpcpp/support/server_callback.h @@ -23,6 +23,9 @@ #include #include +#include "absl/functional/any_invocable.h" + +#include #include #include #include @@ -127,6 +130,12 @@ class ServerCallbackCall { private: virtual ServerReactor* reactor() = 0; + virtual grpc_call* call() = 0; + + virtual void RunAsync(absl::AnyInvocable cb) { + grpc_call_run_in_event_engine(call(), std::move(cb)); + } + // CallOnDone performs the work required at completion of the RPC: invoking // the OnDone function and doing all necessary cleanup. This function is only // ever invoked on a fully-Unref'fed ServerCallbackCall. diff --git a/src/cpp/client/client_callback.cc b/src/cpp/client/client_callback.cc index 7a4bf5ce0bd..85e3ec45699 100644 --- a/src/cpp/client/client_callback.cc +++ b/src/cpp/client/client_callback.cc @@ -15,18 +15,10 @@ // // -#include - -#include "absl/status/status.h" - #include #include #include -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/surface/call.h" namespace grpc { diff --git a/src/cpp/server/server_callback.cc b/src/cpp/server/server_callback.cc index 20d846fad64..0487296b0a1 100644 --- a/src/cpp/server/server_callback.cc +++ b/src/cpp/server/server_callback.cc @@ -15,72 +15,29 @@ // // -#include "absl/status/status.h" - #include -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/executor.h" - namespace grpc { namespace internal { void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) { if (inline_ondone) { CallOnDone(); - } else { - // Unlike other uses of closure, do not Ref or Unref here since at this - // point, all the Ref'fing and Unref'fing is done for this call. - grpc_core::ExecCtx exec_ctx; - struct ClosureWithArg { - grpc_closure closure; - ServerCallbackCall* call; - explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) { - GRPC_CLOSURE_INIT( - &closure, - [](void* void_arg, grpc_error_handle) { - ClosureWithArg* arg = static_cast(void_arg); - arg->call->CallOnDone(); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); - } - }; - ClosureWithArg* arg = new ClosureWithArg(this); - grpc_core::Executor::Run(&arg->closure, absl::OkStatus()); + return; } + RunAsync([this]() { CallOnDone(); }); } void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) { if (reactor->InternalInlineable()) { reactor->OnCancel(); - } else { - // Ref to make sure that the closure executes before the whole call gets - // destructed, and Unref within the closure. - Ref(); - grpc_core::ExecCtx exec_ctx; - struct ClosureWithArg { - grpc_closure closure; - ServerCallbackCall* call; - ServerReactor* reactor; - ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg) - : call(call_arg), reactor(reactor_arg) { - GRPC_CLOSURE_INIT( - &closure, - [](void* void_arg, grpc_error_handle) { - ClosureWithArg* arg = static_cast(void_arg); - arg->reactor->OnCancel(); - arg->call->MaybeDone(); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); - } - }; - ClosureWithArg* arg = new ClosureWithArg(this, reactor); - grpc_core::Executor::Run(&arg->closure, absl::OkStatus()); + return; } + Ref(); + RunAsync([this, reactor]() { + reactor->OnCancel(); + MaybeDone(); + }); } } // namespace internal diff --git a/src/objective-c/BUILD b/src/objective-c/BUILD index 575c963674f..45d9be3d6dc 100644 --- a/src/objective-c/BUILD +++ b/src/objective-c/BUILD @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_generate_objc_one_off_targets", "grpc_objc_library") load("@build_bazel_rules_apple//apple:resources.bzl", "apple_resource_bundle") +load("//bazel:grpc_build_system.bzl", "grpc_generate_objc_one_off_targets", "grpc_objc_library") licenses(["notice"]) diff --git a/src/objective-c/examples/BUILD b/src/objective-c/examples/BUILD index d4079012156..e844118a2b3 100644 --- a/src/objective-c/examples/BUILD +++ b/src/objective-c/examples/BUILD @@ -14,15 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("@build_bazel_rules_apple//apple:ios.bzl", "ios_application") +load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application") +load("@build_bazel_rules_apple//apple:watchos.bzl", "watchos_application", "watchos_extension") load( "//src/objective-c:grpc_objc_internal_library.bzl", "grpc_objc_examples_library", "local_objc_grpc_library", "proto_library_objc_wrapper", ) -load("@build_bazel_rules_apple//apple:ios.bzl", "ios_application") -load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application") -load("@build_bazel_rules_apple//apple:watchos.bzl", "watchos_application", "watchos_extension") licenses(["notice"]) diff --git a/src/objective-c/grpc_objc_internal_library.bzl b/src/objective-c/grpc_objc_internal_library.bzl index 5d884867e20..c5ad083b3e9 100644 --- a/src/objective-c/grpc_objc_internal_library.bzl +++ b/src/objective-c/grpc_objc_internal_library.bzl @@ -23,6 +23,11 @@ Each rule listed must be re-written for Google's internal build system, and each change must be ported from one to the other. """ +load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test") +load( + "@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", + "ios_test_runner", +) load("@rules_proto//proto:defs.bzl", "proto_library") load( "//bazel:generate_objc.bzl", @@ -32,11 +37,6 @@ load( "generate_objc_srcs", ) load("//bazel:grpc_build_system.bzl", "grpc_objc_library") -load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test") -load( - "@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", - "ios_test_runner", -) # The default device type for ios objc unit tests IOS_UNIT_TEST_DEVICE_TYPE = "iPhone 11" diff --git a/src/objective-c/tests/BUILD b/src/objective-c/tests/BUILD index d5544d6cdf2..357b9de9610 100644 --- a/src/objective-c/tests/BUILD +++ b/src/objective-c/tests/BUILD @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("@build_bazel_rules_apple//apple:macos.bzl", "macos_unit_test") +load("@build_bazel_rules_apple//apple:resources.bzl", "apple_resource_bundle") +load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application", "tvos_unit_test") load("//bazel:grpc_build_system.bzl", "grpc_sh_test") load( "//src/objective-c:grpc_objc_internal_library.bzl", @@ -22,9 +25,6 @@ load( "local_objc_grpc_library", "proto_library_objc_wrapper", ) -load("@build_bazel_rules_apple//apple:resources.bzl", "apple_resource_bundle") -load("@build_bazel_rules_apple//apple:macos.bzl", "macos_unit_test") -load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application", "tvos_unit_test") licenses(["notice"]) diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index 41ab62e76e2..0f7bbbecf63 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library") +load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") licenses(["notice"]) diff --git a/src/proto/grpc/testing/xds/v3/BUILD b/src/proto/grpc/testing/xds/v3/BUILD index ff3cfc4f7d7..073c24dd017 100644 --- a/src/proto/grpc/testing/xds/v3/BUILD +++ b/src/proto/grpc/testing/xds/v3/BUILD @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library") +load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") licenses(["notice"]) diff --git a/src/python/grpcio_tests/tests/stress/BUILD.bazel b/src/python/grpcio_tests/tests/stress/BUILD.bazel index a67592904e7..94ae6fe3804 100644 --- a/src/python/grpcio_tests/tests/stress/BUILD.bazel +++ b/src/python/grpcio_tests/tests/stress/BUILD.bazel @@ -1,3 +1,5 @@ +load("@com_github_grpc_grpc//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") + # Copyright 2021 The gRPC Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. load("@rules_proto//proto:defs.bzl", "proto_library") -load("@com_github_grpc_grpc//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library") proto_library( name = "unary_stream_benchmark_proto", diff --git a/test/core/gprpp/BUILD b/test/core/gprpp/BUILD index 596946dd345..6ec04014d73 100644 --- a/test/core/gprpp/BUILD +++ b/test/core/gprpp/BUILD @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE") +load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer") licenses(["notice"]) diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 44758b1a540..8fdb9658373 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") licenses(["notice"]) diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 8aa43fc8c96..c51a1e669b9 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE") load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") load("//test/core/util:grpc_fuzzer.bzl", "grpc_fuzzer", "grpc_proto_fuzzer") -load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE") licenses(["notice"]) diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index 3d0dbe6824a..fa210c65cff 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE") load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_py_binary") load("//test/cpp/qps:qps_benchmark_script.bzl", "json_run_localhost_batch", "qps_json_driver_batch") -load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE") licenses(["notice"]) diff --git a/test/cpp/qps/qps_benchmark_script.bzl b/test/cpp/qps/qps_benchmark_script.bzl index ccd077e0415..a82ce13bdd0 100644 --- a/test/cpp/qps/qps_benchmark_script.bzl +++ b/test/cpp/qps/qps_benchmark_script.bzl @@ -26,8 +26,8 @@ """Script to run qps benchmark.""" load("//bazel:grpc_build_system.bzl", "grpc_cc_test") -load("//test/cpp/qps:qps_json_driver_scenarios.bzl", "QPS_JSON_DRIVER_SCENARIOS") load("//test/cpp/qps:json_run_localhost_scenarios.bzl", "JSON_RUN_LOCALHOST_SCENARIOS") +load("//test/cpp/qps:qps_json_driver_scenarios.bzl", "QPS_JSON_DRIVER_SCENARIOS") def add_suffix(name): # NOTE(https://github.com/grpc/grpc/issues/24178): Add the suffix to the name diff --git a/third_party/address_sorting/BUILD b/third_party/address_sorting/BUILD index 945e7ad9c88..c3f49222055 100644 --- a/third_party/address_sorting/BUILD +++ b/third_party/address_sorting/BUILD @@ -28,6 +28,8 @@ # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. +load(":address_sorting.bzl", "address_sorting_cc_library") + package( default_visibility = ["//visibility:public"], features = [ @@ -36,8 +38,6 @@ package( ], ) -load(":address_sorting.bzl", "address_sorting_cc_library") - licenses(["notice"]) exports_files(["LICENSE"])