[EventEngine] Replace Executor with EE::Run in Client Reactor (#35295)

Closes #35295

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35295 from drfloob:client-callback-reactor-ee de095c7f5c
PiperOrigin-RevId: 590983428
pull/35314/head
AJ Heller 1 year ago committed by Copybara-Service
parent def3288d93
commit a0cab8318d
  1. 1
      BUILD
  2. 4
      CMakeLists.txt
  3. 3
      Makefile
  4. 1
      Package.swift
  5. 4
      build_autogenerated.yaml
  6. 1
      gRPC-Core.podspec
  7. 1
      grpc.gemspec
  8. 29
      include/grpc/impl/call.h
  9. 1
      include/grpc/module.modulemap
  10. 28
      include/grpcpp/support/client_callback.h
  11. 1
      package.xml
  12. 16
      src/core/lib/surface/call.cc
  13. 24
      src/cpp/client/client_callback.cc
  14. 1
      tools/doxygen/Doxyfile.c++
  15. 1
      tools/doxygen/Doxyfile.c++.internal
  16. 1
      tools/doxygen/Doxyfile.core
  17. 1
      tools/doxygen/Doxyfile.core.internal

@ -236,6 +236,7 @@ GPR_PUBLIC_HDRS = [
"include/grpc/support/sync_windows.h",
"include/grpc/support/thd_id.h",
"include/grpc/support/time.h",
"include/grpc/impl/call.h",
"include/grpc/impl/codegen/atm.h",
"include/grpc/impl/codegen/atm_gcc_atomic.h",
"include/grpc/impl/codegen/atm_gcc_sync.h",

4
CMakeLists.txt generated

@ -1698,6 +1698,7 @@ if(_gRPC_PLATFORM_ANDROID)
endif()
foreach(_hdr
include/grpc/impl/call.h
include/grpc/impl/codegen/atm.h
include/grpc/impl/codegen/atm_gcc_atomic.h
include/grpc/impl/codegen/atm_gcc_sync.h
@ -2627,6 +2628,7 @@ foreach(_hdr
include/grpc/grpc_posix.h
include/grpc/grpc_security.h
include/grpc/grpc_security_constants.h
include/grpc/impl/call.h
include/grpc/impl/channel_arg_names.h
include/grpc/impl/codegen/atm.h
include/grpc/impl/codegen/atm_gcc_atomic.h
@ -3303,6 +3305,7 @@ foreach(_hdr
include/grpc/grpc_posix.h
include/grpc/grpc_security.h
include/grpc/grpc_security_constants.h
include/grpc/impl/call.h
include/grpc/impl/channel_arg_names.h
include/grpc/impl/codegen/atm.h
include/grpc/impl/codegen/atm_gcc_atomic.h
@ -5277,6 +5280,7 @@ foreach(_hdr
include/grpc/grpc_posix.h
include/grpc/grpc_security.h
include/grpc/grpc_security_constants.h
include/grpc/impl/call.h
include/grpc/impl/channel_arg_names.h
include/grpc/impl/codegen/atm.h
include/grpc/impl/codegen/atm_gcc_atomic.h

3
Makefile generated

@ -878,6 +878,7 @@ LIBGPR_SRC = \
src/core/lib/gprpp/windows/thd.cc \
PUBLIC_HEADERS_C += \
include/grpc/impl/call.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \
include/grpc/impl/codegen/atm_gcc_sync.h \
@ -1762,6 +1763,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc_posix.h \
include/grpc/grpc_security.h \
include/grpc/grpc_security_constants.h \
include/grpc/impl/call.h \
include/grpc/impl/channel_arg_names.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \
@ -2290,6 +2292,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc_posix.h \
include/grpc/grpc_security.h \
include/grpc/grpc_security_constants.h \
include/grpc/impl/call.h \
include/grpc/impl/channel_arg_names.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \

1
Package.swift generated

@ -61,6 +61,7 @@ let package = Package(
"include/grpc/grpc_posix.h",
"include/grpc/grpc_security.h",
"include/grpc/grpc_security_constants.h",
"include/grpc/impl/call.h",
"include/grpc/impl/channel_arg_names.h",
"include/grpc/impl/codegen/atm.h",
"include/grpc/impl/codegen/atm_gcc_atomic.h",

@ -16,6 +16,7 @@ libs:
build: all
language: c
public_headers:
- include/grpc/impl/call.h
- include/grpc/impl/codegen/atm.h
- include/grpc/impl/codegen/atm_gcc_atomic.h
- include/grpc/impl/codegen/atm_gcc_sync.h
@ -162,6 +163,7 @@ libs:
- include/grpc/grpc_posix.h
- include/grpc/grpc_security.h
- include/grpc/grpc_security_constants.h
- include/grpc/impl/call.h
- include/grpc/impl/channel_arg_names.h
- include/grpc/impl/codegen/atm.h
- include/grpc/impl/codegen/atm_gcc_atomic.h
@ -2102,6 +2104,7 @@ libs:
- include/grpc/grpc_posix.h
- include/grpc/grpc_security.h
- include/grpc/grpc_security_constants.h
- include/grpc/impl/call.h
- include/grpc/impl/channel_arg_names.h
- include/grpc/impl/codegen/atm.h
- include/grpc/impl/codegen/atm_gcc_atomic.h
@ -4360,6 +4363,7 @@ libs:
- include/grpc/grpc_posix.h
- include/grpc/grpc_security.h
- include/grpc/grpc_security_constants.h
- include/grpc/impl/call.h
- include/grpc/impl/channel_arg_names.h
- include/grpc/impl/codegen/atm.h
- include/grpc/impl/codegen/atm_gcc_atomic.h

1
gRPC-Core.podspec generated

@ -127,6 +127,7 @@ Pod::Spec.new do |s|
'include/grpc/grpc_posix.h',
'include/grpc/grpc_security.h',
'include/grpc/grpc_security_constants.h',
'include/grpc/impl/call.h',
'include/grpc/impl/channel_arg_names.h',
'include/grpc/impl/codegen/atm.h',
'include/grpc/impl/codegen/atm_gcc_atomic.h',

1
grpc.gemspec generated

@ -67,6 +67,7 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/grpc_posix.h )
s.files += %w( include/grpc/grpc_security.h )
s.files += %w( include/grpc/grpc_security_constants.h )
s.files += %w( include/grpc/impl/call.h )
s.files += %w( include/grpc/impl/channel_arg_names.h )
s.files += %w( include/grpc/impl/codegen/atm.h )
s.files += %w( include/grpc/impl/codegen/atm_gcc_atomic.h )

@ -0,0 +1,29 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_IMPL_CALL_H
#define GRPC_IMPL_CALL_H
#include <grpc/support/port_platform.h>
#include "absl/functional/any_invocable.h"
#include <grpc/grpc.h>
// Run a callback in the call's EventEngine.
// Internal-only
void grpc_call_run_in_event_engine(const grpc_call* call,
absl::AnyInvocable<void()> cb);
#endif /* GRPC_IMPL_CALL_H */

@ -13,6 +13,7 @@ header "byte_buffer.h"
header "grpc_posix.h"
header "grpc_security.h"
header "grpc_security_constants.h"
header "impl/call.h"
header "impl/channel_arg_names.h"
header "impl/codegen/atm.h"
header "impl/codegen/byte_buffer.h"

@ -23,6 +23,7 @@
#include <functional>
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
@ -123,15 +124,6 @@ class ClientReactor {
/// \param[in] s The status outcome of this RPC
virtual void OnDone(const grpc::Status& /*s*/) = 0;
/// InternalScheduleOnDone is not part of the API and is not meant to be
/// overridden. It is virtual to allow successful builds for certain bazel
/// build users that only want to depend on gRPC codegen headers and not the
/// full library (although this is not a generally-supported option). Although
/// the virtual call is slower than a direct call, this function is
/// heavyweight and the cost of the virtual call is not much in comparison.
/// This function may be removed or devirtualized in the future.
virtual void InternalScheduleOnDone(grpc::Status s);
/// InternalTrailersOnly is not part of the API and is not meant to be
/// overridden. It is virtual to allow successful builds for certain bazel
/// build users that only want to depend on gRPC codegen headers and not the
@ -649,11 +641,13 @@ class ClientCallbackReaderWriterImpl
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackReaderWriterImpl();
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
grpc_call_unref(call);
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
grpc_call_run_in_event_engine(
call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
grpc_call_unref(call);
}
}
}
@ -822,11 +816,13 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackReaderImpl();
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
grpc_call_unref(call);
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
grpc_call_run_in_event_engine(
call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
grpc_call_unref(call);
}
}
}
@ -1040,11 +1036,13 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackWriterImpl();
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
grpc_call_unref(call);
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
grpc_call_run_in_event_engine(
call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
grpc_call_unref(call);
}
}
}

1
package.xml generated

@ -49,6 +49,7 @@
<file baseinstalldir="/" name="include/grpc/grpc_posix.h" role="src" />
<file baseinstalldir="/" name="include/grpc/grpc_security.h" role="src" />
<file baseinstalldir="/" name="include/grpc/grpc_security_constants.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/call.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/channel_arg_names.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/atm.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/atm_gcc_atomic.h" role="src" />

@ -45,6 +45,7 @@
#include <grpc/compression.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/impl/propagation_bits.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
@ -148,6 +149,10 @@ class Call : public CppImplOf<Call, grpc_call> {
// for that functionality be invented)
virtual grpc_call_stack* call_stack() = 0;
// Return the EventEngine used for this call's async execution.
virtual grpc_event_engine::experimental::EventEngine* event_engine()
const = 0;
protected:
// The maximum number of concurrent batches possible.
// Based upon the maximum number of individually queueable ops in the batch
@ -529,6 +534,10 @@ class FilterStackCall final : public Call {
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this)));
}
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return channel()->event_engine();
}
grpc_call_element* call_elem(size_t idx) {
return grpc_call_stack_element(call_stack(), idx);
}
@ -2049,7 +2058,7 @@ class PromiseBasedCall : public Call,
return failed_before_recv_message_.load(std::memory_order_relaxed);
}
grpc_event_engine::experimental::EventEngine* event_engine() const final {
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return channel()->event_engine();
}
@ -3795,3 +3804,8 @@ const char* grpc_call_error_to_string(grpc_call_error error) {
}
GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
}
void grpc_call_run_in_event_engine(const grpc_call* call,
absl::AnyInvocable<void()> cb) {
grpc_core::Call::FromC(call)->event_engine()->Run(std::move(cb));
}

@ -32,30 +32,6 @@
namespace grpc {
namespace internal {
void ClientReactor::InternalScheduleOnDone(grpc::Status s) {
// Unlike other uses of closure, do not Ref or Unref here since the reactor
// object's lifetime is controlled by user code.
grpc_core::ExecCtx exec_ctx;
struct ClosureWithArg {
grpc_closure closure;
ClientReactor* const reactor;
const grpc::Status status;
ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s)
: reactor(reactor_arg), status(std::move(s)) {
GRPC_CLOSURE_INIT(
&closure,
[](void* void_arg, grpc_error_handle) {
ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
arg->reactor->OnDone(arg->status);
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this, std::move(s));
grpc_core::Executor::Run(&arg->closure, absl::OkStatus());
}
bool ClientReactor::InternalTrailersOnly(const grpc_call* call) const {
return grpc_call_is_trailers_only(call);
}

@ -896,6 +896,7 @@ include/grpc/grpc_crl_provider.h \
include/grpc/grpc_posix.h \
include/grpc/grpc_security.h \
include/grpc/grpc_security_constants.h \
include/grpc/impl/call.h \
include/grpc/impl/channel_arg_names.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \

@ -896,6 +896,7 @@ include/grpc/grpc_crl_provider.h \
include/grpc/grpc_posix.h \
include/grpc/grpc_security.h \
include/grpc/grpc_security_constants.h \
include/grpc/impl/call.h \
include/grpc/impl/channel_arg_names.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \

@ -829,6 +829,7 @@ include/grpc/grpc_crl_provider.h \
include/grpc/grpc_posix.h \
include/grpc/grpc_security.h \
include/grpc/grpc_security_constants.h \
include/grpc/impl/call.h \
include/grpc/impl/channel_arg_names.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \

@ -829,6 +829,7 @@ include/grpc/grpc_crl_provider.h \
include/grpc/grpc_posix.h \
include/grpc/grpc_security.h \
include/grpc/grpc_security_constants.h \
include/grpc/impl/call.h \
include/grpc/impl/channel_arg_names.h \
include/grpc/impl/codegen/atm.h \
include/grpc/impl/codegen/atm_gcc_atomic.h \

Loading…
Cancel
Save