[EventEngine] Refactor ServerCallbackCall to use EventEngine::Run (#36126)

This removes two Executor::Run dependencies, and requires that all ServerCallbackCall implementations implement the new `RunAsync` method. There's one other known other implementation of ServerCallbackCall that will need to be updated.

We could also support an "inefficient" path that uses the default engine (not implemented here), for all subclasses that do not want to update. As far as anyone is aware, the ServerCallbackCall class was never intended to be subclassed externally.

Closes #36126

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36126 from drfloob:server-callback-on-ee 6242a78a3f
PiperOrigin-RevId: 619621598
pull/35796/head
AJ Heller 10 months ago committed by Craig Tiller
parent 8013ffe965
commit 3d82c522fa
  1. 6
      BUILD
  2. 6
      bazel/grpc_build_system.bzl
  3. 9
      include/grpcpp/impl/server_callback_handlers.h
  4. 9
      include/grpcpp/server_context.h
  5. 9
      include/grpcpp/support/server_callback.h
  6. 8
      src/cpp/client/client_callback.cc
  7. 59
      src/cpp/server/server_callback.cc
  8. 2
      src/objective-c/BUILD
  9. 6
      src/objective-c/examples/BUILD
  10. 10
      src/objective-c/grpc_objc_internal_library.bzl
  11. 6
      src/objective-c/tests/BUILD
  12. 2
      src/proto/grpc/testing/BUILD
  13. 2
      src/proto/grpc/testing/xds/v3/BUILD
  14. 3
      src/python/grpcio_tests/tests/stress/BUILD.bazel
  15. 2
      test/core/gprpp/BUILD
  16. 2
      test/core/iomgr/BUILD
  17. 2
      test/core/transport/chttp2/BUILD
  18. 2
      test/cpp/qps/BUILD
  19. 2
      test/cpp/qps/qps_benchmark_script.bzl
  20. 4
      third_party/address_sorting/BUILD

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("@bazel_skylib//lib:selects.bzl", "selects")
load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
@ -22,8 +24,6 @@ load(
"grpc_upb_proto_reflection_library",
"python_config_settings",
)
load("@bazel_skylib//lib:selects.bzl", "selects")
load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
licenses(["reciprocal"])
@ -2385,6 +2385,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -2465,6 +2466,7 @@ grpc_cc_library(
hdrs = GRPCXX_HDRS,
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings",

@ -27,13 +27,13 @@
Contains macros used throughout the repo.
"""
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test")
load("@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", "ios_test_runner")
load("@com_google_protobuf//bazel:upb_proto_library.bzl", "upb_proto_library", "upb_proto_reflection_library")
load("//bazel:cc_grpc_library.bzl", "cc_grpc_library")
load("//bazel:copts.bzl", "GRPC_DEFAULT_COPTS")
load("//bazel:experiments.bzl", "EXPERIMENTS", "EXPERIMENT_ENABLES", "EXPERIMENT_POLLERS")
load("//bazel:test_experiments.bzl", "TEST_EXPERIMENTS", "TEST_EXPERIMENT_ENABLES", "TEST_EXPERIMENT_POLLERS")
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test")
load("@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl", "ios_test_runner")
load("@com_google_protobuf//bazel:upb_proto_library.bzl", "upb_proto_library", "upb_proto_reflection_library")
# The set of pollers to test against if a test exercises polling
POLLERS = ["epoll1", "poll"]

@ -19,6 +19,7 @@
#define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/server_context.h>
@ -186,6 +187,8 @@ class CallbackUnaryHandler : public grpc::internal::MethodHandler {
ctx_->set_message_allocator_state(allocator_state);
}
grpc_call* call() override { return call_.call(); }
/// SetupReactor binds the reactor (which also releases any queued
/// operations), maybe calls OnCancel if possible/needed, and maybe marks
/// the completion of the RPC. This should be the last component of the
@ -370,6 +373,8 @@ class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
std::function<void()> call_requester)
: ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
grpc_call* call() override { return call_.call(); }
void SetupReactor(ServerReadReactor<RequestType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
// The callback for this function should not be inlined because it invokes
@ -595,6 +600,8 @@ class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
req_(req),
call_requester_(std::move(call_requester)) {}
grpc_call* call() override { return call_.call(); }
void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
// The callback for this function should not be inlined because it invokes
@ -807,6 +814,8 @@ class CallbackBidiHandler : public grpc::internal::MethodHandler {
std::function<void()> call_requester)
: ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
grpc_call* call() override { return call_.call(); }
void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
reactor_.store(reactor, std::memory_order_relaxed);
// The callbacks for these functions should not be inlined because they

@ -29,6 +29,7 @@
#include <vector>
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/impl/compression_types.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
@ -520,7 +521,9 @@ class ServerContextBase {
public:
TestServerCallbackUnary(ServerContextBase* ctx,
std::function<void(grpc::Status)> func)
: reactor_(ctx->DefaultReactor()), func_(std::move(func)) {
: reactor_(ctx->DefaultReactor()),
func_(std::move(func)),
call_(ctx->c_call()) {
this->BindReactor(reactor_);
}
void Finish(grpc::Status s) override {
@ -537,12 +540,16 @@ class ServerContextBase {
private:
void CallOnDone() override {}
grpc_call* call() override { return call_; }
grpc::internal::ServerReactor* reactor() override { return reactor_; }
grpc::ServerUnaryReactor* const reactor_;
std::atomic_bool status_set_{false};
grpc::Status status_;
const std::function<void(grpc::Status s)> func_;
grpc_call* call_;
};
alignas(Reactor) char default_reactor_[sizeof(Reactor)];

@ -23,6 +23,9 @@
#include <functional>
#include <type_traits>
#include "absl/functional/any_invocable.h"
#include <grpc/impl/call.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
#include <grpcpp/impl/sync.h>
@ -127,6 +130,12 @@ class ServerCallbackCall {
private:
virtual ServerReactor* reactor() = 0;
virtual grpc_call* call() = 0;
virtual void RunAsync(absl::AnyInvocable<void()> cb) {
grpc_call_run_in_event_engine(call(), std::move(cb));
}
// CallOnDone performs the work required at completion of the RPC: invoking
// the OnDone function and doing all necessary cleanup. This function is only
// ever invoked on a fully-Unref'fed ServerCallbackCall.

@ -15,18 +15,10 @@
//
//
#include <utility>
#include "absl/status/status.h"
#include <grpc/grpc.h>
#include <grpcpp/support/client_callback.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/surface/call.h"
namespace grpc {

@ -15,72 +15,29 @@
//
//
#include "absl/status/status.h"
#include <grpcpp/support/server_callback.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
namespace grpc {
namespace internal {
void ServerCallbackCall::ScheduleOnDone(bool inline_ondone) {
if (inline_ondone) {
CallOnDone();
} else {
// Unlike other uses of closure, do not Ref or Unref here since at this
// point, all the Ref'fing and Unref'fing is done for this call.
grpc_core::ExecCtx exec_ctx;
struct ClosureWithArg {
grpc_closure closure;
ServerCallbackCall* call;
explicit ClosureWithArg(ServerCallbackCall* call_arg) : call(call_arg) {
GRPC_CLOSURE_INIT(
&closure,
[](void* void_arg, grpc_error_handle) {
ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
arg->call->CallOnDone();
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this);
grpc_core::Executor::Run(&arg->closure, absl::OkStatus());
return;
}
RunAsync([this]() { CallOnDone(); });
}
void ServerCallbackCall::CallOnCancel(ServerReactor* reactor) {
if (reactor->InternalInlineable()) {
reactor->OnCancel();
} else {
// Ref to make sure that the closure executes before the whole call gets
// destructed, and Unref within the closure.
Ref();
grpc_core::ExecCtx exec_ctx;
struct ClosureWithArg {
grpc_closure closure;
ServerCallbackCall* call;
ServerReactor* reactor;
ClosureWithArg(ServerCallbackCall* call_arg, ServerReactor* reactor_arg)
: call(call_arg), reactor(reactor_arg) {
GRPC_CLOSURE_INIT(
&closure,
[](void* void_arg, grpc_error_handle) {
ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
arg->reactor->OnCancel();
arg->call->MaybeDone();
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this, reactor);
grpc_core::Executor::Run(&arg->closure, absl::OkStatus());
return;
}
Ref();
RunAsync([this, reactor]() {
reactor->OnCancel();
MaybeDone();
});
}
} // namespace internal

@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_generate_objc_one_off_targets", "grpc_objc_library")
load("@build_bazel_rules_apple//apple:resources.bzl", "apple_resource_bundle")
load("//bazel:grpc_build_system.bzl", "grpc_generate_objc_one_off_targets", "grpc_objc_library")
licenses(["notice"])

@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_application")
load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application")
load("@build_bazel_rules_apple//apple:watchos.bzl", "watchos_application", "watchos_extension")
load(
"//src/objective-c:grpc_objc_internal_library.bzl",
"grpc_objc_examples_library",
"local_objc_grpc_library",
"proto_library_objc_wrapper",
)
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_application")
load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application")
load("@build_bazel_rules_apple//apple:watchos.bzl", "watchos_application", "watchos_extension")
licenses(["notice"])

@ -23,6 +23,11 @@ Each rule listed must be re-written for Google's internal build system, and
each change must be ported from one to the other.
"""
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test")
load(
"@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl",
"ios_test_runner",
)
load("@rules_proto//proto:defs.bzl", "proto_library")
load(
"//bazel:generate_objc.bzl",
@ -32,11 +37,6 @@ load(
"generate_objc_srcs",
)
load("//bazel:grpc_build_system.bzl", "grpc_objc_library")
load("@build_bazel_rules_apple//apple:ios.bzl", "ios_unit_test")
load(
"@build_bazel_rules_apple//apple/testing/default_runner:ios_test_runner.bzl",
"ios_test_runner",
)
# The default device type for ios objc unit tests
IOS_UNIT_TEST_DEVICE_TYPE = "iPhone 11"

@ -14,6 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("@build_bazel_rules_apple//apple:macos.bzl", "macos_unit_test")
load("@build_bazel_rules_apple//apple:resources.bzl", "apple_resource_bundle")
load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application", "tvos_unit_test")
load("//bazel:grpc_build_system.bzl", "grpc_sh_test")
load(
"//src/objective-c:grpc_objc_internal_library.bzl",
@ -22,9 +25,6 @@ load(
"local_objc_grpc_library",
"proto_library_objc_wrapper",
)
load("@build_bazel_rules_apple//apple:resources.bzl", "apple_resource_bundle")
load("@build_bazel_rules_apple//apple:macos.bzl", "macos_unit_test")
load("@build_bazel_rules_apple//apple:tvos.bzl", "tvos_application", "tvos_unit_test")
licenses(["notice"])

@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
licenses(["notice"])

@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
licenses(["notice"])

@ -1,3 +1,5 @@
load("@com_github_grpc_grpc//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -12,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@com_github_grpc_grpc//bazel:python_rules.bzl", "py_grpc_library", "py_proto_library")
proto_library(
name = "unary_stream_benchmark_proto",

@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE")
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//test/core/util:grpc_fuzzer.bzl", "grpc_proto_fuzzer")
licenses(["notice"])

@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
licenses(["notice"])

@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE")
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//test/core/util:grpc_fuzzer.bzl", "grpc_fuzzer", "grpc_proto_fuzzer")
load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE")
licenses(["notice"])

@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE")
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_py_binary")
load("//test/cpp/qps:qps_benchmark_script.bzl", "json_run_localhost_batch", "qps_json_driver_batch")
load("//bazel:custom_exec_properties.bzl", "LARGE_MACHINE")
licenses(["notice"])

@ -26,8 +26,8 @@
"""Script to run qps benchmark."""
load("//bazel:grpc_build_system.bzl", "grpc_cc_test")
load("//test/cpp/qps:qps_json_driver_scenarios.bzl", "QPS_JSON_DRIVER_SCENARIOS")
load("//test/cpp/qps:json_run_localhost_scenarios.bzl", "JSON_RUN_LOCALHOST_SCENARIOS")
load("//test/cpp/qps:qps_json_driver_scenarios.bzl", "QPS_JSON_DRIVER_SCENARIOS")
def add_suffix(name):
# NOTE(https://github.com/grpc/grpc/issues/24178): Add the suffix to the name

@ -28,6 +28,8 @@
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
load(":address_sorting.bzl", "address_sorting_cc_library")
package(
default_visibility = ["//visibility:public"],
features = [
@ -36,8 +38,6 @@ package(
],
)
load(":address_sorting.bzl", "address_sorting_cc_library")
licenses(["notice"])
exports_files(["LICENSE"])

Loading…
Cancel
Save