From fb14685fd6a53c8e1404341f7f2c39a96bfaa289 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 29 Jan 2024 09:47:00 -0800 Subject: [PATCH] [transport] Move call spine related types into their own file (#35689) Part of a continued hygiene effort for transport.h (pure code movement, no refactoring at this time) Closes #35689 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35689 from ctiller:channel-ez 3366310cc3370916f6ebc439b0c47131ada49e3d PiperOrigin-RevId: 602416330 --- BUILD | 2 +- CMakeLists.txt | 3 + Makefile | 2 + Package.swift | 2 + build_autogenerated.yaml | 6 + config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 3 + package.xml | 2 + src/core/BUILD | 24 ++ src/core/lib/transport/call_spine.cc | 103 +++++ src/core/lib/transport/call_spine.h | 418 ++++++++++++++++++++ src/core/lib/transport/transport.cc | 86 ---- src/core/lib/transport/transport.h | 387 +----------------- src/python/grpcio/grpc_core_dependencies.py | 1 + tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 20 files changed, 579 insertions(+), 473 deletions(-) create mode 100644 src/core/lib/transport/call_spine.cc create mode 100644 src/core/lib/transport/call_spine.h diff --git a/BUILD b/BUILD index 13f33a18cf7..7c2afe0697a 100644 --- a/BUILD +++ b/BUILD @@ -1533,6 +1533,7 @@ grpc_cc_library( "//src/core:bitset", "//src/core:call_filters", "//src/core:call_final_info", + "//src/core:call_spine", "//src/core:cancel_callback", "//src/core:channel_args", "//src/core:channel_args_endpoint_config", @@ -1584,7 +1585,6 @@ grpc_cc_library( "//src/core:pollset_set", "//src/core:posix_event_engine_base_hdrs", "//src/core:posix_event_engine_endpoint", - "//src/core:prioritized_race", "//src/core:promise_status", "//src/core:race", "//src/core:random_early_detection", diff --git a/CMakeLists.txt b/CMakeLists.txt index 31334215ff9..539fb5cb73e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2528,6 +2528,7 @@ add_library(grpc src/core/lib/transport/bdp_estimator.cc src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc + src/core/lib/transport/call_spine.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/handshaker.cc @@ -3243,6 +3244,7 @@ add_library(grpc_unsecure src/core/lib/transport/bdp_estimator.cc src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc + src/core/lib/transport/call_spine.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/handshaker.cc @@ -5330,6 +5332,7 @@ add_library(grpc_authorization_provider src/core/lib/transport/batch_builder.cc src/core/lib/transport/call_filters.cc src/core/lib/transport/call_final_info.cc + src/core/lib/transport/call_spine.cc src/core/lib/transport/connectivity_state.cc src/core/lib/transport/error_utils.cc src/core/lib/transport/handshaker.cc diff --git a/Makefile b/Makefile index a07741e841f..b857b96f504 100644 --- a/Makefile +++ b/Makefile @@ -1710,6 +1710,7 @@ LIBGRPC_SRC = \ src/core/lib/transport/bdp_estimator.cc \ src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_final_info.cc \ + src/core/lib/transport/call_spine.cc \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/error_utils.cc \ src/core/lib/transport/handshaker.cc \ @@ -2259,6 +2260,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/transport/bdp_estimator.cc \ src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_final_info.cc \ + src/core/lib/transport/call_spine.cc \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/error_utils.cc \ src/core/lib/transport/handshaker.cc \ diff --git a/Package.swift b/Package.swift index cf59d4d2e6f..16a12d054f8 100644 --- a/Package.swift +++ b/Package.swift @@ -1903,6 +1903,8 @@ let package = Package( "src/core/lib/transport/call_filters.h", "src/core/lib/transport/call_final_info.cc", "src/core/lib/transport/call_final_info.h", + "src/core/lib/transport/call_spine.cc", + "src/core/lib/transport/call_spine.h", "src/core/lib/transport/connectivity_state.cc", "src/core/lib/transport/connectivity_state.h", "src/core/lib/transport/custom_metadata.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 52a7b9a5c87..634658fae83 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1179,6 +1179,7 @@ libs: - src/core/lib/transport/bdp_estimator.h - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h + - src/core/lib/transport/call_spine.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -1982,6 +1983,7 @@ libs: - src/core/lib/transport/bdp_estimator.cc - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc + - src/core/lib/transport/call_spine.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/handshaker.cc @@ -2628,6 +2630,7 @@ libs: - src/core/lib/transport/bdp_estimator.h - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h + - src/core/lib/transport/call_spine.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -3047,6 +3050,7 @@ libs: - src/core/lib/transport/bdp_estimator.cc - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc + - src/core/lib/transport/call_spine.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/handshaker.cc @@ -4669,6 +4673,7 @@ libs: - src/core/lib/transport/batch_builder.h - src/core/lib/transport/call_filters.h - src/core/lib/transport/call_final_info.h + - src/core/lib/transport/call_spine.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h - src/core/lib/transport/error_utils.h @@ -4965,6 +4970,7 @@ libs: - src/core/lib/transport/batch_builder.cc - src/core/lib/transport/call_filters.cc - src/core/lib/transport/call_final_info.cc + - src/core/lib/transport/call_spine.cc - src/core/lib/transport/connectivity_state.cc - src/core/lib/transport/error_utils.cc - src/core/lib/transport/handshaker.cc diff --git a/config.m4 b/config.m4 index 1538009a4f4..89713d1a315 100644 --- a/config.m4 +++ b/config.m4 @@ -838,6 +838,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/transport/bdp_estimator.cc \ src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_final_info.cc \ + src/core/lib/transport/call_spine.cc \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/error_utils.cc \ src/core/lib/transport/handshaker.cc \ diff --git a/config.w32 b/config.w32 index 95fb787dae8..0b6600482bd 100644 --- a/config.w32 +++ b/config.w32 @@ -803,6 +803,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\transport\\bdp_estimator.cc " + "src\\core\\lib\\transport\\call_filters.cc " + "src\\core\\lib\\transport\\call_final_info.cc " + + "src\\core\\lib\\transport\\call_spine.cc " + "src\\core\\lib\\transport\\connectivity_state.cc " + "src\\core\\lib\\transport\\error_utils.cc " + "src\\core\\lib\\transport\\handshaker.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 57ebb666793..cb91126225b 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -1283,6 +1283,7 @@ Pod::Spec.new do |s| 'src/core/lib/transport/bdp_estimator.h', 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.h', + 'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/error_utils.h', @@ -2537,6 +2538,7 @@ Pod::Spec.new do |s| 'src/core/lib/transport/bdp_estimator.h', 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.h', + 'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/error_utils.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1193cdeaeda..8c5a7ffec9d 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -2012,6 +2012,8 @@ Pod::Spec.new do |s| 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.cc', 'src/core/lib/transport/call_final_info.h', + 'src/core/lib/transport/call_spine.cc', + 'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', @@ -3314,6 +3316,7 @@ Pod::Spec.new do |s| 'src/core/lib/transport/bdp_estimator.h', 'src/core/lib/transport/call_filters.h', 'src/core/lib/transport/call_final_info.h', + 'src/core/lib/transport/call_spine.h', 'src/core/lib/transport/connectivity_state.h', 'src/core/lib/transport/custom_metadata.h', 'src/core/lib/transport/error_utils.h', diff --git a/grpc.gemspec b/grpc.gemspec index e3e58357291..dd2024bb35e 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1905,6 +1905,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/transport/call_filters.h ) s.files += %w( src/core/lib/transport/call_final_info.cc ) s.files += %w( src/core/lib/transport/call_final_info.h ) + s.files += %w( src/core/lib/transport/call_spine.cc ) + s.files += %w( src/core/lib/transport/call_spine.h ) s.files += %w( src/core/lib/transport/connectivity_state.cc ) s.files += %w( src/core/lib/transport/connectivity_state.h ) s.files += %w( src/core/lib/transport/custom_metadata.h ) diff --git a/grpc.gyp b/grpc.gyp index 4ffc196c1d6..6a86620aeee 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1024,6 +1024,7 @@ 'src/core/lib/transport/bdp_estimator.cc', 'src/core/lib/transport/call_filters.cc', 'src/core/lib/transport/call_final_info.cc', + 'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/error_utils.cc', 'src/core/lib/transport/handshaker.cc', @@ -1513,6 +1514,7 @@ 'src/core/lib/transport/bdp_estimator.cc', 'src/core/lib/transport/call_filters.cc', 'src/core/lib/transport/call_final_info.cc', + 'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/error_utils.cc', 'src/core/lib/transport/handshaker.cc', @@ -2282,6 +2284,7 @@ 'src/core/lib/transport/batch_builder.cc', 'src/core/lib/transport/call_filters.cc', 'src/core/lib/transport/call_final_info.cc', + 'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/error_utils.cc', 'src/core/lib/transport/handshaker.cc', diff --git a/package.xml b/package.xml index 0e91f6f4f56..75bfae51bd7 100644 --- a/package.xml +++ b/package.xml @@ -1887,6 +1887,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 0b2e72f92d4..5ff1b119925 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6788,6 +6788,30 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "call_spine", + srcs = [ + "lib/transport/call_spine.cc", + ], + hdrs = [ + "lib/transport/call_spine.h", + ], + deps = [ + "1999", + "for_each", + "if", + "latch", + "message", + "metadata", + "pipe", + "prioritized_race", + "promise_status", + "status_flag", + "try_seq", + "//:gpr", + ], +) + grpc_cc_library( name = "metadata_batch", srcs = [ diff --git a/src/core/lib/transport/call_spine.cc b/src/core/lib/transport/call_spine.cc new file mode 100644 index 00000000000..78e2aca797c --- /dev/null +++ b/src/core/lib/transport/call_spine.cc @@ -0,0 +1,103 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/transport/call_spine.h" + +namespace grpc_core { + +void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, + ClientMetadataHandle client_initial_metadata) { + // Send initial metadata. + call_initiator.SpawnGuarded( + "send_initial_metadata", + [client_initial_metadata = std::move(client_initial_metadata), + call_initiator]() mutable { + return call_initiator.PushClientInitialMetadata( + std::move(client_initial_metadata)); + }); + // Read messages from handler into initiator. + call_handler.SpawnGuarded("read_messages", [call_handler, + call_initiator]() mutable { + return Seq(ForEach(OutgoingMessages(call_handler), + [call_initiator](MessageHandle msg) mutable { + // Need to spawn a job into the initiator's activity to + // push the message in. + return call_initiator.SpawnWaitable( + "send_message", + [msg = std::move(msg), call_initiator]() mutable { + return call_initiator.CancelIfFails( + call_initiator.PushMessage(std::move(msg))); + }); + }), + [call_initiator](StatusFlag result) mutable { + call_initiator.SpawnInfallible( + "finish-downstream", [call_initiator, result]() mutable { + if (result.ok()) { + call_initiator.FinishSends(); + } else { + call_initiator.Cancel(); + } + return Empty{}; + }); + return result; + }); + }); + call_initiator.SpawnInfallible("read_the_things", [call_initiator, + call_handler]() mutable { + return Seq( + call_initiator.CancelIfFails(TrySeq( + call_initiator.PullServerInitialMetadata(), + [call_handler, + call_initiator](absl::optional md) mutable { + const bool has_md = md.has_value(); + call_handler.SpawnGuarded( + "recv_initial_metadata", + [md = std::move(md), call_handler]() mutable { + return call_handler.PushServerInitialMetadata( + std::move(md)); + }); + return If( + has_md, + ForEach(OutgoingMessages(call_initiator), + [call_handler](MessageHandle msg) mutable { + return call_handler.SpawnWaitable( + "recv_message", + [msg = std::move(msg), call_handler]() mutable { + return call_handler.CancelIfFails( + call_handler.PushMessage(std::move(msg))); + }); + }), + []() -> StatusFlag { return Success{}; }); + })), + call_initiator.PullServerTrailingMetadata(), + [call_handler](ServerMetadataHandle md) mutable { + call_handler.SpawnGuarded( + "recv_trailing_metadata", + [md = std::move(md), call_handler]() mutable { + return call_handler.PushServerTrailingMetadata(std::move(md)); + }); + return Empty{}; + }); + }); +} + +CallInitiatorAndHandler MakeCall( + grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena) { + auto spine = CallSpine::Create(event_engine, arena); + return {CallInitiator(spine), CallHandler(spine)}; +} + +} // namespace grpc_core diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h new file mode 100644 index 00000000000..d25ba239f48 --- /dev/null +++ b/src/core/lib/transport/call_spine.h @@ -0,0 +1,418 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_SPINE_H +#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_SPINE_H + +#include + +#include + +#include "src/core/lib/promise/detail/status.h" +#include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/if.h" +#include "src/core/lib/promise/latch.h" +#include "src/core/lib/promise/party.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/prioritized_race.h" +#include "src/core/lib/promise/status_flag.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/transport/message.h" +#include "src/core/lib/transport/metadata.h" + +namespace grpc_core { + +// The common middle part of a call - a reference is held by each of +// CallInitiator and CallHandler - which provide interfaces that are appropriate +// for each side of a call. +// The spine will ultimately host the pipes, filters, and context for one part +// of a call: ie top-half client channel, sub channel call, server call. +// TODO(ctiller): eventually drop this when we don't need to reference into +// legacy promise calls anymore +class CallSpineInterface { + public: + virtual ~CallSpineInterface() = default; + virtual Pipe& client_initial_metadata() = 0; + virtual Pipe& server_initial_metadata() = 0; + virtual Pipe& client_to_server_messages() = 0; + virtual Pipe& server_to_client_messages() = 0; + virtual Pipe& server_trailing_metadata() = 0; + virtual Latch& cancel_latch() = 0; + // Add a callback to be called when server trailing metadata is received. + void OnDone(absl::AnyInvocable fn) { + if (on_done_ == nullptr) { + on_done_ = std::move(fn); + return; + } + on_done_ = [first = std::move(fn), next = std::move(on_done_)]() mutable { + first(); + next(); + }; + } + void CallOnDone() { + if (on_done_ != nullptr) std::exchange(on_done_, nullptr)(); + } + virtual Party& party() = 0; + virtual void IncrementRefCount() = 0; + virtual void Unref() = 0; + + // Cancel the call with the given metadata. + // Regarding the `MUST_USE_RESULT absl::nullopt_t`: + // Most cancellation calls right now happen in pipe interceptors; + // there `nullopt` indicates terminate processing of this pipe and close with + // error. + // It's convenient then to have the Cancel operation (setting the latch to + // terminate the call) be the last thing that occurs in a pipe interceptor, + // and this construction supports that (and has helped the author not write + // some bugs). + GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) { + GPR_DEBUG_ASSERT(GetContext() == &party()); + auto& c = cancel_latch(); + if (c.is_set()) return absl::nullopt; + c.Set(std::move(metadata)); + CallOnDone(); + client_initial_metadata().sender.CloseWithError(); + server_initial_metadata().sender.CloseWithError(); + client_to_server_messages().sender.CloseWithError(); + server_to_client_messages().sender.CloseWithError(); + return absl::nullopt; + } + + auto WaitForCancel() { + GPR_DEBUG_ASSERT(GetContext() == &party()); + return cancel_latch().Wait(); + } + + // Wrap a promise so that if it returns failure it automatically cancels + // the rest of the call. + // The resulting (returned) promise will resolve to Empty. + template + auto CancelIfFails(Promise promise) { + GPR_DEBUG_ASSERT(GetContext() == &party()); + using P = promise_detail::PromiseLike; + using ResultType = typename P::Result; + return Map(std::move(promise), [this](ResultType r) { + if (!IsStatusOk(r)) { + std::ignore = Cancel(StatusCast(r)); + } + return r; + }); + } + + // Spawn a promise that returns Empty{} and save some boilerplate handling + // that detail. + template + void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { + party().Spawn(name, std::move(promise_factory), [](Empty) {}); + } + + // Spawn a promise that returns some status-like type; if the status + // represents failure automatically cancel the rest of the call. + template + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { + using FactoryType = + promise_detail::OncePromiseFactory; + using PromiseType = typename FactoryType::Promise; + using ResultType = typename PromiseType::Result; + static_assert( + std::is_same()))>::value, + "SpawnGuarded promise must return a status-like object"); + party().Spawn(name, std::move(promise_factory), [this](ResultType r) { + if (!IsStatusOk(r)) { + if (grpc_trace_promise_primitives.enabled()) { + gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s", + r.ToString().c_str()); + } + std::ignore = Cancel(StatusCast(std::move(r))); + } + }); + } + + private: + absl::AnyInvocable on_done_{nullptr}; +}; + +class CallSpine final : public CallSpineInterface, public Party { + public: + static RefCountedPtr Create( + grpc_event_engine::experimental::EventEngine* event_engine, + Arena* arena) { + return RefCountedPtr(arena->New(event_engine, arena)); + } + + Pipe& client_initial_metadata() override { + return client_initial_metadata_; + } + Pipe& server_initial_metadata() override { + return server_initial_metadata_; + } + Pipe& client_to_server_messages() override { + return client_to_server_messages_; + } + Pipe& server_to_client_messages() override { + return server_to_client_messages_; + } + Pipe& server_trailing_metadata() override { + return server_trailing_metadata_; + } + Latch& cancel_latch() override { return cancel_latch_; } + Party& party() override { return *this; } + void IncrementRefCount() override { Party::IncrementRefCount(); } + void Unref() override { Party::Unref(); } + + private: + friend class Arena; + CallSpine(grpc_event_engine::experimental::EventEngine* event_engine, + Arena* arena) + : Party(arena, 1), event_engine_(event_engine) {} + + class ScopedContext : public ScopedActivity, + public promise_detail::Context { + public: + explicit ScopedContext(CallSpine* spine) + : ScopedActivity(&spine->party()), Context(spine->arena()) {} + }; + + bool RunParty() override { + ScopedContext context(this); + return Party::RunParty(); + } + + void PartyOver() override { + Arena* a = arena(); + { + ScopedContext context(this); + CancelRemainingParticipants(); + a->DestroyManagedNewObjects(); + } + this->~CallSpine(); + a->Destroy(); + } + + grpc_event_engine::experimental::EventEngine* event_engine() const override { + return event_engine_; + } + + // Initial metadata from client to server + Pipe client_initial_metadata_{arena()}; + // Initial metadata from server to client + Pipe server_initial_metadata_{arena()}; + // Messages travelling from the application to the transport. + Pipe client_to_server_messages_{arena()}; + // Messages travelling from the transport to the application. + Pipe server_to_client_messages_{arena()}; + // Trailing metadata from server to client + Pipe server_trailing_metadata_{arena()}; + // Latch that can be set to terminate the call + Latch cancel_latch_; + // Event engine associated with this call + grpc_event_engine::experimental::EventEngine* const event_engine_; +}; + +class CallInitiator { + public: + explicit CallInitiator(RefCountedPtr spine) + : spine_(std::move(spine)) {} + + auto PushClientInitialMetadata(ClientMetadataHandle md) { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return Map(spine_->client_initial_metadata().sender.Push(std::move(md)), + [](bool ok) { return StatusFlag(ok); }); + } + + auto PullServerInitialMetadata() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return Map(spine_->server_initial_metadata().receiver.Next(), + [](NextResult md) + -> ValueOrFailure> { + if (!md.has_value()) { + if (md.cancelled()) return Failure{}; + return absl::optional(); + } + return absl::optional(std::move(*md)); + }); + } + + auto PullServerTrailingMetadata() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return PrioritizedRace( + Map(spine_->server_trailing_metadata().receiver.Next(), + [spine = spine_]( + NextResult md) -> ServerMetadataHandle { + GPR_ASSERT(md.has_value()); + return std::move(*md); + }), + spine_->WaitForCancel()); + } + + auto PullMessage() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return spine_->server_to_client_messages().receiver.Next(); + } + + auto PushMessage(MessageHandle message) { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return Map( + spine_->client_to_server_messages().sender.Push(std::move(message)), + [](bool r) { return StatusFlag(r); }); + } + + void FinishSends() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + spine_->client_to_server_messages().sender.Close(); + } + + template + auto CancelIfFails(Promise promise) { + return spine_->CancelIfFails(std::move(promise)); + } + + void Cancel() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + std::ignore = + spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError())); + } + + template + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { + spine_->SpawnGuarded(name, std::move(promise_factory)); + } + + template + void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { + spine_->SpawnInfallible(name, std::move(promise_factory)); + } + + template + auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) { + return spine_->party().SpawnWaitable(name, std::move(promise_factory)); + } + + Arena* arena() { return spine_->party().arena(); } + + private: + RefCountedPtr spine_; +}; + +class CallHandler { + public: + explicit CallHandler(RefCountedPtr spine) + : spine_(std::move(spine)) {} + + auto PullClientInitialMetadata() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return Map(spine_->client_initial_metadata().receiver.Next(), + [](NextResult md) + -> ValueOrFailure { + if (!md.has_value()) return Failure{}; + return std::move(*md); + }); + } + + auto PushServerInitialMetadata(absl::optional md) { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return If( + md.has_value(), + [&md, this]() { + return Map( + spine_->server_initial_metadata().sender.Push(std::move(*md)), + [](bool ok) { return StatusFlag(ok); }); + }, + [this]() { + spine_->server_initial_metadata().sender.Close(); + return []() -> StatusFlag { return Success{}; }; + }); + } + + auto PushServerTrailingMetadata(ServerMetadataHandle md) { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + spine_->server_initial_metadata().sender.Close(); + spine_->server_to_client_messages().sender.Close(); + spine_->client_to_server_messages().receiver.CloseWithError(); + spine_->CallOnDone(); + return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)), + [](bool ok) { return StatusFlag(ok); }); + } + + auto PullMessage() { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return spine_->client_to_server_messages().receiver.Next(); + } + + auto PushMessage(MessageHandle message) { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + return Map( + spine_->server_to_client_messages().sender.Push(std::move(message)), + [](bool ok) { return StatusFlag(ok); }); + } + + void Cancel(ServerMetadataHandle status) { + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); + std::ignore = spine_->Cancel(std::move(status)); + } + + void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } + + template + auto CancelIfFails(Promise promise) { + return spine_->CancelIfFails(std::move(promise)); + } + + template + void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { + spine_->SpawnGuarded(name, std::move(promise_factory)); + } + + template + void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { + spine_->SpawnInfallible(name, std::move(promise_factory)); + } + + template + auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) { + return spine_->party().SpawnWaitable(name, std::move(promise_factory)); + } + + Arena* arena() { return spine_->party().arena(); } + + private: + RefCountedPtr spine_; +}; + +struct CallInitiatorAndHandler { + CallInitiator initiator; + CallHandler handler; +}; + +CallInitiatorAndHandler MakeCall( + grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena); + +template +auto OutgoingMessages(CallHalf h) { + struct Wrapper { + CallHalf h; + auto Next() { return h.PullMessage(); } + }; + return Wrapper{std::move(h)}; +} + +// Forward a call from `call_handler` to `call_initiator` (with initial metadata +// `client_initial_metadata`) +void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, + ClientMetadataHandle client_initial_metadata); + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_SPINE_H diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index f4c66b3383c..aa4134f88ee 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -215,89 +215,3 @@ grpc_transport_stream_op_batch* grpc_make_transport_stream_op( op->op.on_complete = &op->outer_on_complete; return &op->op; } - -namespace grpc_core { - -void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, - ClientMetadataHandle client_initial_metadata) { - // Send initial metadata. - call_initiator.SpawnGuarded( - "send_initial_metadata", - [client_initial_metadata = std::move(client_initial_metadata), - call_initiator]() mutable { - return call_initiator.PushClientInitialMetadata( - std::move(client_initial_metadata)); - }); - // Read messages from handler into initiator. - call_handler.SpawnGuarded("read_messages", [call_handler, - call_initiator]() mutable { - return Seq(ForEach(OutgoingMessages(call_handler), - [call_initiator](MessageHandle msg) mutable { - // Need to spawn a job into the initiator's activity to - // push the message in. - return call_initiator.SpawnWaitable( - "send_message", - [msg = std::move(msg), call_initiator]() mutable { - return call_initiator.CancelIfFails( - call_initiator.PushMessage(std::move(msg))); - }); - }), - [call_initiator](StatusFlag result) mutable { - call_initiator.SpawnInfallible( - "finish-downstream", [call_initiator, result]() mutable { - if (result.ok()) { - call_initiator.FinishSends(); - } else { - call_initiator.Cancel(); - } - return Empty{}; - }); - return result; - }); - }); - call_initiator.SpawnInfallible("read_the_things", [call_initiator, - call_handler]() mutable { - return Seq( - call_initiator.CancelIfFails(TrySeq( - call_initiator.PullServerInitialMetadata(), - [call_handler, - call_initiator](absl::optional md) mutable { - const bool has_md = md.has_value(); - call_handler.SpawnGuarded( - "recv_initial_metadata", - [md = std::move(md), call_handler]() mutable { - return call_handler.PushServerInitialMetadata( - std::move(md)); - }); - return If( - has_md, - ForEach(OutgoingMessages(call_initiator), - [call_handler](MessageHandle msg) mutable { - return call_handler.SpawnWaitable( - "recv_message", - [msg = std::move(msg), call_handler]() mutable { - return call_handler.CancelIfFails( - call_handler.PushMessage(std::move(msg))); - }); - }), - []() -> StatusFlag { return Success{}; }); - })), - call_initiator.PullServerTrailingMetadata(), - [call_handler](ServerMetadataHandle md) mutable { - call_handler.SpawnGuarded( - "recv_trailing_metadata", - [md = std::move(md), call_handler]() mutable { - return call_handler.PushServerTrailingMetadata(std::move(md)); - }); - return Empty{}; - }); - }); -} - -CallInitiatorAndHandler MakeCall( - grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena) { - auto spine = CallSpine::Create(event_engine, arena); - return {CallInitiator(spine), CallHandler(spine)}; -} - -} // namespace grpc_core diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 6b222eff3bf..91924f80550 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -42,7 +42,6 @@ #include "src/core/lib/channel/context.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/iomgr/call_combiner.h" @@ -53,16 +52,12 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/context.h" -#include "src/core/lib/promise/detail/status.h" -#include "src/core/lib/promise/if.h" #include "src/core/lib/promise/latch.h" -#include "src/core/lib/promise/party.h" #include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/prioritized_race.h" -#include "src/core/lib/promise/status_flag.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/call_final_info.h" +#include "src/core/lib/transport/call_spine.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/message.h" #include "src/core/lib/transport/metadata.h" @@ -160,386 +155,6 @@ struct CallArgs { using NextPromiseFactory = std::function(CallArgs)>; -// The common middle part of a call - a reference is held by each of -// CallInitiator and CallHandler - which provide interfaces that are appropriate -// for each side of a call. -// The spine will ultimately host the pipes, filters, and context for one part -// of a call: ie top-half client channel, sub channel call, server call. -// TODO(ctiller): eventually drop this when we don't need to reference into -// legacy promise calls anymore -class CallSpineInterface { - public: - virtual ~CallSpineInterface() = default; - virtual Pipe& client_initial_metadata() = 0; - virtual Pipe& server_initial_metadata() = 0; - virtual Pipe& client_to_server_messages() = 0; - virtual Pipe& server_to_client_messages() = 0; - virtual Pipe& server_trailing_metadata() = 0; - virtual Latch& cancel_latch() = 0; - // Add a callback to be called when server trailing metadata is received. - void OnDone(absl::AnyInvocable fn) { - if (on_done_ == nullptr) { - on_done_ = std::move(fn); - return; - } - on_done_ = [first = std::move(fn), next = std::move(on_done_)]() mutable { - first(); - next(); - }; - } - void CallOnDone() { - if (on_done_ != nullptr) std::exchange(on_done_, nullptr)(); - } - virtual Party& party() = 0; - virtual void IncrementRefCount() = 0; - virtual void Unref() = 0; - - // Cancel the call with the given metadata. - // Regarding the `MUST_USE_RESULT absl::nullopt_t`: - // Most cancellation calls right now happen in pipe interceptors; - // there `nullopt` indicates terminate processing of this pipe and close with - // error. - // It's convenient then to have the Cancel operation (setting the latch to - // terminate the call) be the last thing that occurs in a pipe interceptor, - // and this construction supports that (and has helped the author not write - // some bugs). - GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) { - GPR_DEBUG_ASSERT(GetContext() == &party()); - auto& c = cancel_latch(); - if (c.is_set()) return absl::nullopt; - c.Set(std::move(metadata)); - CallOnDone(); - client_initial_metadata().sender.CloseWithError(); - server_initial_metadata().sender.CloseWithError(); - client_to_server_messages().sender.CloseWithError(); - server_to_client_messages().sender.CloseWithError(); - return absl::nullopt; - } - - auto WaitForCancel() { - GPR_DEBUG_ASSERT(GetContext() == &party()); - return cancel_latch().Wait(); - } - - // Wrap a promise so that if it returns failure it automatically cancels - // the rest of the call. - // The resulting (returned) promise will resolve to Empty. - template - auto CancelIfFails(Promise promise) { - GPR_DEBUG_ASSERT(GetContext() == &party()); - using P = promise_detail::PromiseLike; - using ResultType = typename P::Result; - return Map(std::move(promise), [this](ResultType r) { - if (!IsStatusOk(r)) { - std::ignore = Cancel(StatusCast(r)); - } - return r; - }); - } - - // Spawn a promise that returns Empty{} and save some boilerplate handling - // that detail. - template - void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { - party().Spawn(name, std::move(promise_factory), [](Empty) {}); - } - - // Spawn a promise that returns some status-like type; if the status - // represents failure automatically cancel the rest of the call. - template - void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { - using FactoryType = - promise_detail::OncePromiseFactory; - using PromiseType = typename FactoryType::Promise; - using ResultType = typename PromiseType::Result; - static_assert( - std::is_same()))>::value, - "SpawnGuarded promise must return a status-like object"); - party().Spawn(name, std::move(promise_factory), [this](ResultType r) { - if (!IsStatusOk(r)) { - if (grpc_trace_promise_primitives.enabled()) { - gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s", - r.ToString().c_str()); - } - std::ignore = Cancel(StatusCast(std::move(r))); - } - }); - } - - private: - absl::AnyInvocable on_done_{nullptr}; -}; - -class CallSpine final : public CallSpineInterface, public Party { - public: - static RefCountedPtr Create( - grpc_event_engine::experimental::EventEngine* event_engine, - Arena* arena) { - return RefCountedPtr(arena->New(event_engine, arena)); - } - - Pipe& client_initial_metadata() override { - return client_initial_metadata_; - } - Pipe& server_initial_metadata() override { - return server_initial_metadata_; - } - Pipe& client_to_server_messages() override { - return client_to_server_messages_; - } - Pipe& server_to_client_messages() override { - return server_to_client_messages_; - } - Pipe& server_trailing_metadata() override { - return server_trailing_metadata_; - } - Latch& cancel_latch() override { return cancel_latch_; } - Party& party() override { return *this; } - void IncrementRefCount() override { Party::IncrementRefCount(); } - void Unref() override { Party::Unref(); } - - private: - friend class Arena; - CallSpine(grpc_event_engine::experimental::EventEngine* event_engine, - Arena* arena) - : Party(arena, 1), event_engine_(event_engine) {} - - class ScopedContext : public ScopedActivity, - public promise_detail::Context { - public: - explicit ScopedContext(CallSpine* spine) - : ScopedActivity(&spine->party()), Context(spine->arena()) {} - }; - - bool RunParty() override { - ScopedContext context(this); - return Party::RunParty(); - } - - void PartyOver() override { - Arena* a = arena(); - { - ScopedContext context(this); - CancelRemainingParticipants(); - a->DestroyManagedNewObjects(); - } - this->~CallSpine(); - a->Destroy(); - } - - grpc_event_engine::experimental::EventEngine* event_engine() const override { - return event_engine_; - } - - // Initial metadata from client to server - Pipe client_initial_metadata_{arena()}; - // Initial metadata from server to client - Pipe server_initial_metadata_{arena()}; - // Messages travelling from the application to the transport. - Pipe client_to_server_messages_{arena()}; - // Messages travelling from the transport to the application. - Pipe server_to_client_messages_{arena()}; - // Trailing metadata from server to client - Pipe server_trailing_metadata_{arena()}; - // Latch that can be set to terminate the call - Latch cancel_latch_; - // Event engine associated with this call - grpc_event_engine::experimental::EventEngine* const event_engine_; -}; - -class CallInitiator { - public: - explicit CallInitiator(RefCountedPtr spine) - : spine_(std::move(spine)) {} - - auto PushClientInitialMetadata(ClientMetadataHandle md) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->client_initial_metadata().sender.Push(std::move(md)), - [](bool ok) { return StatusFlag(ok); }); - } - - auto PullServerInitialMetadata() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->server_initial_metadata().receiver.Next(), - [](NextResult md) - -> ValueOrFailure> { - if (!md.has_value()) { - if (md.cancelled()) return Failure{}; - return absl::optional(); - } - return absl::optional(std::move(*md)); - }); - } - - auto PullServerTrailingMetadata() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return PrioritizedRace( - Map(spine_->server_trailing_metadata().receiver.Next(), - [spine = spine_]( - NextResult md) -> ServerMetadataHandle { - GPR_ASSERT(md.has_value()); - return std::move(*md); - }), - spine_->WaitForCancel()); - } - - auto PullMessage() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return spine_->server_to_client_messages().receiver.Next(); - } - - auto PushMessage(MessageHandle message) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map( - spine_->client_to_server_messages().sender.Push(std::move(message)), - [](bool r) { return StatusFlag(r); }); - } - - void FinishSends() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - spine_->client_to_server_messages().sender.Close(); - } - - template - auto CancelIfFails(Promise promise) { - return spine_->CancelIfFails(std::move(promise)); - } - - void Cancel() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - std::ignore = - spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError())); - } - - template - void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnGuarded(name, std::move(promise_factory)); - } - - template - void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnInfallible(name, std::move(promise_factory)); - } - - template - auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) { - return spine_->party().SpawnWaitable(name, std::move(promise_factory)); - } - - Arena* arena() { return spine_->party().arena(); } - - private: - RefCountedPtr spine_; -}; - -class CallHandler { - public: - explicit CallHandler(RefCountedPtr spine) - : spine_(std::move(spine)) {} - - auto PullClientInitialMetadata() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map(spine_->client_initial_metadata().receiver.Next(), - [](NextResult md) - -> ValueOrFailure { - if (!md.has_value()) return Failure{}; - return std::move(*md); - }); - } - - auto PushServerInitialMetadata(absl::optional md) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return If( - md.has_value(), - [&md, this]() { - return Map( - spine_->server_initial_metadata().sender.Push(std::move(*md)), - [](bool ok) { return StatusFlag(ok); }); - }, - [this]() { - spine_->server_initial_metadata().sender.Close(); - return []() -> StatusFlag { return Success{}; }; - }); - } - - auto PushServerTrailingMetadata(ServerMetadataHandle md) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - spine_->server_initial_metadata().sender.Close(); - spine_->server_to_client_messages().sender.Close(); - spine_->client_to_server_messages().receiver.CloseWithError(); - spine_->CallOnDone(); - return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)), - [](bool ok) { return StatusFlag(ok); }); - } - - auto PullMessage() { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return spine_->client_to_server_messages().receiver.Next(); - } - - auto PushMessage(MessageHandle message) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - return Map( - spine_->server_to_client_messages().sender.Push(std::move(message)), - [](bool ok) { return StatusFlag(ok); }); - } - - void Cancel(ServerMetadataHandle status) { - GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); - std::ignore = spine_->Cancel(std::move(status)); - } - - void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } - - template - auto CancelIfFails(Promise promise) { - return spine_->CancelIfFails(std::move(promise)); - } - - template - void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnGuarded(name, std::move(promise_factory)); - } - - template - void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) { - spine_->SpawnInfallible(name, std::move(promise_factory)); - } - - template - auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) { - return spine_->party().SpawnWaitable(name, std::move(promise_factory)); - } - - Arena* arena() { return spine_->party().arena(); } - - private: - RefCountedPtr spine_; -}; - -struct CallInitiatorAndHandler { - CallInitiator initiator; - CallHandler handler; -}; - -CallInitiatorAndHandler MakeCall( - grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena); - -template -auto OutgoingMessages(CallHalf h) { - struct Wrapper { - CallHalf h; - auto Next() { return h.PullMessage(); } - }; - return Wrapper{std::move(h)}; -} - -// Forward a call from `call_handler` to `call_initiator` (with initial metadata -// `client_initial_metadata`) -void ForwardCall(CallHandler call_handler, CallInitiator call_initiator, - ClientMetadataHandle client_initial_metadata); - } // namespace grpc_core // forward declarations diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 3f63633766f..7c4a43be9ca 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -812,6 +812,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/transport/bdp_estimator.cc', 'src/core/lib/transport/call_filters.cc', 'src/core/lib/transport/call_final_info.cc', + 'src/core/lib/transport/call_spine.cc', 'src/core/lib/transport/connectivity_state.cc', 'src/core/lib/transport/error_utils.cc', 'src/core/lib/transport/handshaker.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 3b0353e91a9..320cfe8c255 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2904,6 +2904,8 @@ src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_filters.h \ src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_final_info.h \ +src/core/lib/transport/call_spine.cc \ +src/core/lib/transport/call_spine.h \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.h \ src/core/lib/transport/custom_metadata.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 244a3e18d5d..4cc0d41398b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2685,6 +2685,8 @@ src/core/lib/transport/call_filters.cc \ src/core/lib/transport/call_filters.h \ src/core/lib/transport/call_final_info.cc \ src/core/lib/transport/call_final_info.h \ +src/core/lib/transport/call_spine.cc \ +src/core/lib/transport/call_spine.h \ src/core/lib/transport/connectivity_state.cc \ src/core/lib/transport/connectivity_state.h \ src/core/lib/transport/custom_metadata.h \