[channel] Add CallFactory, CallDestination (#35766)

Closes #35766

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35766 from ctiller:chan3 ea9a3260e3
PiperOrigin-RevId: 603437058
pull/35787/head
Craig Tiller 1 year ago committed by Copybara-Service
parent 354cdecc12
commit 610f439342
  1. 2
      BUILD
  2. 3
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 6
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 3
      grpc.gyp
  12. 2
      package.xml
  13. 32
      src/core/BUILD
  14. 32
      src/core/lib/surface/channel.cc
  15. 10
      src/core/lib/surface/channel.h
  16. 35
      src/core/lib/transport/call_destination.h
  17. 41
      src/core/lib/transport/call_factory.cc
  18. 56
      src/core/lib/transport/call_factory.h
  19. 1
      src/python/grpcio/grpc_core_dependencies.py
  20. 2
      tools/doxygen/Doxyfile.c++.internal
  21. 2
      tools/doxygen/Doxyfile.core.internal

@ -1531,9 +1531,9 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:call_factory",
"//src/core:call_filters",
"//src/core:call_final_info",
"//src/core:call_size_estimator",
"//src/core:call_spine",
"//src/core:cancel_callback",
"//src/core:channel_args",

3
CMakeLists.txt generated

@ -2526,6 +2526,7 @@ add_library(grpc
src/core/lib/surface/version.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/call_factory.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_size_estimator.cc
@ -3243,6 +3244,7 @@ add_library(grpc_unsecure
src/core/lib/surface/version.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/bdp_estimator.cc
src/core/lib/transport/call_factory.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_size_estimator.cc
@ -5332,6 +5334,7 @@ add_library(grpc_authorization_provider
src/core/lib/surface/validate_metadata.cc
src/core/lib/surface/version.cc
src/core/lib/transport/batch_builder.cc
src/core/lib/transport/call_factory.cc
src/core/lib/transport/call_filters.cc
src/core/lib/transport/call_final_info.cc
src/core/lib/transport/call_size_estimator.cc

2
Makefile generated

@ -1708,6 +1708,7 @@ LIBGRPC_SRC = \
src/core/lib/surface/version.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_factory.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_size_estimator.cc \
@ -2259,6 +2260,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/surface/version.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_factory.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_size_estimator.cc \

2
Package.swift generated

@ -1899,6 +1899,8 @@ let package = Package(
"src/core/lib/transport/batch_builder.h",
"src/core/lib/transport/bdp_estimator.cc",
"src/core/lib/transport/bdp_estimator.h",
"src/core/lib/transport/call_factory.cc",
"src/core/lib/transport/call_factory.h",
"src/core/lib/transport/call_filters.cc",
"src/core/lib/transport/call_filters.h",
"src/core/lib/transport/call_final_info.cc",

@ -1177,6 +1177,7 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_factory.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_size_estimator.h
@ -1982,6 +1983,7 @@ libs:
- src/core/lib/surface/version.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/call_factory.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_size_estimator.cc
@ -2630,6 +2632,7 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_factory.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_size_estimator.h
@ -3051,6 +3054,7 @@ libs:
- src/core/lib/surface/version.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/bdp_estimator.cc
- src/core/lib/transport/call_factory.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_size_estimator.cc
@ -4675,6 +4679,7 @@ libs:
- src/core/lib/surface/validate_metadata.h
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/call_factory.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_size_estimator.h
@ -4973,6 +4978,7 @@ libs:
- src/core/lib/surface/validate_metadata.cc
- src/core/lib/surface/version.cc
- src/core/lib/transport/batch_builder.cc
- src/core/lib/transport/call_factory.cc
- src/core/lib/transport/call_filters.cc
- src/core/lib/transport/call_final_info.cc
- src/core/lib/transport/call_size_estimator.cc

1
config.m4 generated

@ -836,6 +836,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/surface/version.cc \
src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/call_factory.cc \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_final_info.cc \
src/core/lib/transport/call_size_estimator.cc \

1
config.w32 generated

@ -801,6 +801,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\surface\\version.cc " +
"src\\core\\lib\\transport\\batch_builder.cc " +
"src\\core\\lib\\transport\\bdp_estimator.cc " +
"src\\core\\lib\\transport\\call_factory.cc " +
"src\\core\\lib\\transport\\call_filters.cc " +
"src\\core\\lib\\transport\\call_final_info.cc " +
"src\\core\\lib\\transport\\call_size_estimator.cc " +

2
gRPC-C++.podspec generated

@ -1281,6 +1281,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_factory.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.h',
@ -2537,6 +2538,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_factory.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.h',

3
gRPC-Core.podspec generated

@ -2008,6 +2008,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_factory.cc',
'src/core/lib/transport/call_factory.h',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.cc',
@ -3316,6 +3318,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_factory.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_size_estimator.h',

2
grpc.gemspec generated

@ -1901,6 +1901,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/batch_builder.h )
s.files += %w( src/core/lib/transport/bdp_estimator.cc )
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/call_factory.cc )
s.files += %w( src/core/lib/transport/call_factory.h )
s.files += %w( src/core/lib/transport/call_filters.cc )
s.files += %w( src/core/lib/transport/call_filters.h )
s.files += %w( src/core/lib/transport/call_final_info.cc )

3
grpc.gyp generated

@ -1022,6 +1022,7 @@
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_factory.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_size_estimator.cc',
@ -1513,6 +1514,7 @@
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_factory.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_size_estimator.cc',
@ -2284,6 +2286,7 @@
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/call_factory.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_size_estimator.cc',

2
package.xml generated

@ -1883,6 +1883,8 @@
<file baseinstalldir="/" name="src/core/lib/transport/batch_builder.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.cc" role="src" />

@ -6734,6 +6734,38 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "call_factory",
srcs = [
"lib/transport/call_factory.cc",
],
hdrs = [
"lib/transport/call_factory.h",
],
deps = [
"arena",
"call_size_estimator",
"call_spine",
"channel_args",
"ref_counted",
"resource_quota",
"//:gpr_platform",
"//:stats",
],
)
grpc_cc_library(
name = "call_destination",
hdrs = [
"lib/transport/call_destination.h",
],
deps = [
"call_spine",
"//:gpr_platform",
"//:orphanable",
],
)
grpc_cc_library(
name = "parsed_metadata",
srcs = [

@ -58,12 +58,25 @@
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/init_internally.h"
#include "src/core/lib/transport/call_factory.h"
#include "src/core/lib/transport/transport.h"
// IWYU pragma: no_include <type_traits>
namespace grpc_core {
namespace {
class NotReallyACallFactory final : public CallFactory {
public:
using CallFactory::CallFactory;
CallInitiator CreateCall(ClientMetadataHandle, Arena*) override {
Crash("NotReallyACallFactory::CreateCall should never be called");
}
};
} // namespace
Channel::Channel(bool is_client, bool is_promising, std::string target,
const ChannelArgs& channel_args,
grpc_compression_options compression_options,
@ -71,14 +84,10 @@ Channel::Channel(bool is_client, bool is_promising, std::string target,
: is_client_(is_client),
is_promising_(is_promising),
compression_options_(compression_options),
call_size_estimator_(channel_stack->call_stack_size +
grpc_call_get_initial_size_estimate()),
channelz_node_(channel_args.GetObjectRef<channelz::ChannelNode>()),
allocator_(channel_args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()),
target_(std::move(target)),
channel_stack_(std::move(channel_stack)) {
channel_stack_(std::move(channel_stack)),
call_factory_(MakeRefCounted<NotReallyACallFactory>(channel_args)) {
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,
@ -107,17 +116,6 @@ Channel::Channel(bool is_client, bool is_promising, std::string target,
};
}
Arena* Channel::CreateArena() {
const size_t initial_size = call_size_estimator_.CallSizeEstimate();
global_stats().IncrementCallInitialSize(initial_size);
return Arena::Create(initial_size, &allocator_);
}
void Channel::DestroyArena(Arena* arena) {
call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
arena->Destroy();
}
absl::StatusOr<RefCountedPtr<Channel>> Channel::CreateWithBuilder(
ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();

@ -52,10 +52,9 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/call_factory.h"
#include "src/core/lib/transport/transport.h"
/// The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so
@ -122,8 +121,8 @@ class Channel : public RefCounted<Channel>,
channelz::ChannelNode* channelz_node() const { return channelz_node_.get(); }
Arena* CreateArena();
void DestroyArena(Arena* arena);
Arena* CreateArena() { return call_factory_->CreateArena(); }
void DestroyArena(Arena* arena) { return call_factory_->DestroyArena(arena); }
absl::string_view target() const { return target_; }
bool is_client() const { return is_client_; }
@ -148,12 +147,11 @@ class Channel : public RefCounted<Channel>,
const bool is_client_;
const bool is_promising_;
const grpc_compression_options compression_options_;
CallSizeEstimator call_size_estimator_;
CallRegistrationTable registration_table_;
RefCountedPtr<channelz::ChannelNode> channelz_node_;
MemoryAllocator allocator_;
std::string target_;
const RefCountedPtr<grpc_channel_stack> channel_stack_;
const RefCountedPtr<CallFactory> call_factory_;
};
} // namespace grpc_core

@ -0,0 +1,35 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/transport/call_spine.h"
namespace grpc_core {
// CallDestination is responsible for the processing of a CallHandler.
// It might be a transport, the server API, or a subchannel on the client (for
// instance).
class CallDestination : public Orphanable {
public:
virtual void StartCall(CallHandler call_handler) = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H

@ -0,0 +1,41 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/call_factory.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/resource_quota/resource_quota.h"
namespace grpc_core {
CallFactory::CallFactory(const ChannelArgs& args)
: call_size_estimator_(1024),
allocator_(args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryOwner()) {}
Arena* CallFactory::CreateArena() {
const size_t initial_size = call_size_estimator_.CallSizeEstimate();
global_stats().IncrementCallInitialSize(initial_size);
return Arena::Create(initial_size, &allocator_);
}
void CallFactory::DestroyArena(Arena* arena) {
call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
arena->Destroy();
}
} // namespace grpc_core

@ -0,0 +1,56 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FACTORY_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FACTORY_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/transport/call_size_estimator.h"
#include "src/core/lib/transport/call_spine.h"
namespace grpc_core {
// CallFactory creates calls.
class CallFactory : public RefCounted<CallFactory> {
public:
explicit CallFactory(const ChannelArgs& args);
// Create an arena for a call.
// We do this as a separate step so that servers can create arenas without
// creating the call into it - in the case that we have a HTTP/2 rapid reset
// like attack this saves a lot of cpu time.
Arena* CreateArena();
// Destroy an arena created by CreateArena.
// Updates the call size estimator so that we always create arenas of about
// the right size.
void DestroyArena(Arena* arena);
// Create a call. The call will be created in the given arena.
// It is the CallFactory's responsibility to ensure that the CallHandler
// associated with the call is eventually handled by something (typically a
// CallDestination, but this is not strictly required).
virtual CallInitiator CreateCall(ClientMetadataHandle md, Arena* arena) = 0;
private:
CallSizeEstimator call_size_estimator_;
MemoryAllocator allocator_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FACTORY_H

@ -810,6 +810,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/surface/version.cc',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/call_factory.cc',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_final_info.cc',
'src/core/lib/transport/call_size_estimator.cc',

@ -2900,6 +2900,8 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_factory.cc \
src/core/lib/transport/call_factory.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \

@ -2681,6 +2681,8 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_factory.cc \
src/core/lib/transport/call_factory.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \

Loading…
Cancel
Save