diff --git a/BUILD b/BUILD index 6ab2604c972..b1558619135 100644 --- a/BUILD +++ b/BUILD @@ -3798,6 +3798,7 @@ grpc_cc_library( "//src/core:grpc_service_config", "//src/core:idle_filter_state", "//src/core:init_internally", + "//src/core:interception_chain", "//src/core:iomgr_fwd", "//src/core:json", "//src/core:latch", diff --git a/src/core/client_channel/client_channel.h b/src/core/client_channel/client_channel.h index 128a9e5ce91..da5c7658691 100644 --- a/src/core/client_channel/client_channel.h +++ b/src/core/client_channel/client_channel.h @@ -71,7 +71,7 @@ class ClientChannel : public Channel { ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_); RefCountedPtr call_destination() override { - return subchannel_->connected_subchannel(); + return subchannel_->connected_subchannel()->unstarted_call_destination(); } void RequestConnection() override { subchannel_->RequestConnection(); } diff --git a/src/core/client_channel/subchannel.cc b/src/core/client_channel/subchannel.cc index 2ab211e7efc..c003497e083 100644 --- a/src/core/client_channel/subchannel.cc +++ b/src/core/client_channel/subchannel.cc @@ -55,6 +55,8 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/sync.h" @@ -68,6 +70,7 @@ #include "src/core/lib/surface/init_internally.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/interception_chain.h" #include "src/core/lib/transport/transport.h" #include "src/core/util/alloc.h" #include "src/core/util/useful.h" @@ -101,7 +104,7 @@ DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); ConnectedSubchannel::ConnectedSubchannel( const ChannelArgs& args, RefCountedPtr channelz_subchannel) - : UnstartedCallDestination( + : RefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "ConnectedSubchannel" : nullptr), @@ -140,8 +143,9 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel { Crash("call v3 ping method called in legacy impl"); } - void StartCall(UnstartedCallHandler) override { - Crash("call v3 StartCall() method called in legacy impl"); + RefCountedPtr unstarted_call_destination() + const override { + Crash("call v3 unstarted_call_destination method called in legacy impl"); } grpc_channel_stack* channel_stack() const override { @@ -163,8 +167,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel { // handles the channelz updates. return OnCancel( Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)), - [self = RefAsSubclass()]( - ServerMetadataHandle metadata) { + [self = Ref()](ServerMetadataHandle metadata) { channelz::SubchannelNode* channelz_subchannel = self->channelz_subchannel(); GPR_ASSERT(channelz_subchannel != nullptr); @@ -176,7 +179,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel { } return metadata; }), - [self = RefAsSubclass()]() { + [self = Ref()]() { channelz::SubchannelNode* channelz_subchannel = self->channelz_subchannel(); GPR_ASSERT(channelz_subchannel != nullptr); @@ -203,12 +206,30 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel { class NewConnectedSubchannel : public ConnectedSubchannel { public: + class TransportCallDestination final : public CallDestination { + public: + explicit TransportCallDestination(OrphanablePtr transport) + : transport_(std::move(transport)) {} + + ClientTransport* transport() { return transport_.get(); } + + void HandleCall(CallHandler handler) override { + transport_->StartCall(std::move(handler)); + } + + void Orphaned() override { transport_.reset(); } + + private: + OrphanablePtr transport_; + }; + NewConnectedSubchannel( - RefCountedPtr filter_stack, - OrphanablePtr transport, const ChannelArgs& args, + RefCountedPtr call_destination, + RefCountedPtr transport, + const ChannelArgs& args, RefCountedPtr channelz_subchannel) : ConnectedSubchannel(args, std::move(channelz_subchannel)), - filter_stack_(std::move(filter_stack)), + call_destination_(std::move(call_destination)), transport_(std::move(transport)) {} void StartWatch( @@ -221,9 +242,9 @@ class NewConnectedSubchannel : public ConnectedSubchannel { // FIXME: add new transport API for this in v3 stack } - void StartCall(UnstartedCallHandler unstarted_handler) override { - auto handler = unstarted_handler.StartCall(filter_stack_); - transport_->client_transport()->StartCall(std::move(handler)); + RefCountedPtr unstarted_call_destination() + const override { + return call_destination_; } grpc_channel_stack* channel_stack() const override { return nullptr; } @@ -240,8 +261,8 @@ class NewConnectedSubchannel : public ConnectedSubchannel { } private: - RefCountedPtr filter_stack_; - OrphanablePtr transport_; + RefCountedPtr call_destination_; + RefCountedPtr transport_; }; // @@ -849,7 +870,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { bool Subchannel::PublishTransportLocked() { auto socket_node = std::move(connecting_result_.socket_node); - if (!IsCallV3Enabled()) { + if (connecting_result_.transport->filter_stack_transport() != nullptr) { // Construct channel stack. // Builder takes ownership of transport. ChannelStackBuilderImpl builder( @@ -870,15 +891,28 @@ bool Subchannel::PublishTransportLocked() { connected_subchannel_ = MakeRefCounted( std::move(*stack), args_, channelz_node_); } else { - // Call v3 stack. - CallFilters::StackBuilder builder; - // FIXME: add filters registered for CLIENT_SUBCHANNEL - auto filter_stack = builder.Build(); + OrphanablePtr transport( + std::exchange(connecting_result_.transport, nullptr) + ->client_transport()); + InterceptionChainBuilder builder( + connecting_result_.channel_args.SetObject(transport.get())); + CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder( + GRPC_CLIENT_SUBCHANNEL, builder); + auto transport_destination = + MakeRefCounted( + std::move(transport)); + auto call_destination = builder.Build(transport_destination); + if (!call_destination.ok()) { + connecting_result_.Reset(); + gpr_log(GPR_ERROR, + "subchannel %p %s: error initializing subchannel stack: %s", this, + key_.ToString().c_str(), + call_destination.status().ToString().c_str()); + return false; + } connected_subchannel_ = MakeRefCounted( - std::move(filter_stack), - OrphanablePtr( - std::exchange(connecting_result_.transport, nullptr)), - args_, channelz_node_); + std::move(*call_destination), std::move(transport_destination), args_, + channelz_node_); } connecting_result_.Reset(); // Publish. diff --git a/src/core/client_channel/subchannel.h b/src/core/client_channel/subchannel.h index 3168eb8348d..e40bfadf889 100644 --- a/src/core/client_channel/subchannel.h +++ b/src/core/client_channel/subchannel.h @@ -64,7 +64,7 @@ namespace grpc_core { class SubchannelCall; -class ConnectedSubchannel : public UnstartedCallDestination { +class ConnectedSubchannel : public RefCounted { public: const ChannelArgs& args() const { return args_; } channelz::SubchannelNode* channelz_subchannel() const { @@ -77,6 +77,8 @@ class ConnectedSubchannel : public UnstartedCallDestination { // Methods for v3 stack. virtual void Ping(absl::AnyInvocable on_ack) = 0; + virtual RefCountedPtr unstarted_call_destination() + const = 0; // Methods for legacy stack. virtual grpc_channel_stack* channel_stack() const = 0; @@ -85,8 +87,6 @@ class ConnectedSubchannel : public UnstartedCallDestination { CallArgs call_args) = 0; virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0; - void Orphaned() override {} - protected: ConnectedSubchannel( const ChannelArgs& args, diff --git a/test/core/call/yodel/yodel_test.cc b/test/core/call/yodel/yodel_test.cc index 33c7aa4a420..1b4d359086d 100644 --- a/test/core/call/yodel/yodel_test.cc +++ b/test/core/call/yodel/yodel_test.cc @@ -19,6 +19,7 @@ #include "absl/random/random.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/resource_quota/resource_quota.h" @@ -153,7 +154,10 @@ void YodelTest::RunTest() { ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( "test-allocator"), 1024); - TestImpl(); + { + ExecCtx exec_ctx; + TestImpl(); + } EXPECT_EQ(pending_actions_.size(), 0) << "There are still pending actions: did you forget to call " "WaitForAllPendingWork()?"; @@ -165,6 +169,7 @@ void YodelTest::RunTest() { void YodelTest::TickUntilTrue(absl::FunctionRef poll) { WatchDog watchdog(this); while (!poll()) { + ExecCtx exec_ctx; state_->event_engine->Tick(); } } diff --git a/test/core/client_channel/BUILD b/test/core/client_channel/BUILD index 3e6cdb03b50..8d1733d0a96 100644 --- a/test/core/client_channel/BUILD +++ b/test/core/client_channel/BUILD @@ -55,6 +55,27 @@ grpc_yodel_test( ], ) +grpc_cc_library( + name = "connected_subchannel_test_lib", + testonly = True, + srcs = ["connected_subchannel_test.cc"], + external_deps = ["gtest"], + deps = [ + "//:grpc_client_channel", + "//test/core/call/yodel:yodel_test", + ], + alwayslink = 1, +) + +grpc_yodel_test( + name = "connected_subchannel", + deps = [ + ":connected_subchannel_test_lib", + "//:grpc", + "//test/core/test_util:grpc_test_util", + ], +) + grpc_cc_library( name = "load_balanced_call_destination_test_lib", testonly = True, diff --git a/test/core/client_channel/client_channel_test.cc b/test/core/client_channel/client_channel_test.cc index 5ac74248d72..c3fff8e12c1 100644 --- a/test/core/client_channel/client_channel_test.cc +++ b/test/core/client_channel/client_channel_test.cc @@ -62,12 +62,11 @@ class ClientChannelTest : public YodelTest { } CallHandler TickUntilCallStarted() { - auto poll = [this]() -> Poll { + return TickUntil([this]() -> Poll { auto handler = call_destination_->PopHandler(); if (handler.has_value()) return std::move(*handler); return Pending(); - }; - return TickUntil(absl::FunctionRef()>(poll)); + }); } void QueueNameResolutionResult(Resolver::Result result) { diff --git a/test/core/client_channel/connected_subchannel_test.cc b/test/core/client_channel/connected_subchannel_test.cc new file mode 100644 index 00000000000..1862b56b68d --- /dev/null +++ b/test/core/client_channel/connected_subchannel_test.cc @@ -0,0 +1,179 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/client_channel/client_channel.h" +#include "src/core/client_channel/local_subchannel_pool.h" +#include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/config/core_configuration.h" +#include "test/core/call/yodel/yodel_test.h" + +namespace grpc_core { + +using EventEngine = grpc_event_engine::experimental::EventEngine; + +namespace { +const absl::string_view kTestPath = "/test_method"; +const absl::string_view kTestAddress = "ipv4:127.0.0.1:1234"; +const absl::string_view kDefaultAuthority = "test-authority"; +} // namespace + +class ConnectedSubchannelTest : public YodelTest { + public: + protected: + using YodelTest::YodelTest; + + RefCountedPtr InitChannel(const ChannelArgs& args) { + grpc_resolved_address addr; + CHECK(grpc_parse_uri(URI::Parse(kTestAddress).value(), &addr)); + auto subchannel = Subchannel::Create(MakeOrphanable(this), + addr, CompleteArgs(args)); + { + ExecCtx exec_ctx; + subchannel->RequestConnection(); + } + return TickUntil>( + [subchannel]() -> Poll> { + auto connected_subchannel = subchannel->connected_subchannel(); + if (connected_subchannel != nullptr) return connected_subchannel; + return Pending(); + }); + } + + ClientMetadataHandle MakeClientInitialMetadata() { + auto client_initial_metadata = Arena::MakePooled(); + client_initial_metadata->Set(HttpPathMetadata(), + Slice::FromCopiedString(kTestPath)); + return client_initial_metadata; + } + + CallInitiatorAndHandler MakeCall( + ClientMetadataHandle client_initial_metadata) { + return MakeCallPair( + std::move(client_initial_metadata), event_engine().get(), + call_arena_allocator_->MakeArena(), call_arena_allocator_, nullptr); + } + + CallHandler TickUntilCallStarted() { + return TickUntil([this]() -> Poll { + auto handler = PopHandler(); + if (handler.has_value()) return std::move(*handler); + return Pending(); + }); + } + + private: + class TestTransport final : public ClientTransport { + public: + explicit TestTransport(ConnectedSubchannelTest* test) : test_(test) {} + + void Orphan() override { + state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(), + "transport-orphaned"); + } + + FilterStackTransport* filter_stack_transport() override { return nullptr; } + ClientTransport* client_transport() override { return this; } + ServerTransport* server_transport() override { return nullptr; } + absl::string_view GetTransportName() const override { return "test"; } + void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override {} + void SetPollsetSet(grpc_stream* stream, + grpc_pollset_set* pollset_set) override {} + void PerformOp(grpc_transport_op* op) override { + LOG(INFO) << "PerformOp: " << grpc_transport_op_string(op); + if (op->start_connectivity_watch != nullptr) { + state_tracker_.AddWatcher(op->start_connectivity_watch_state, + std::move(op->start_connectivity_watch)); + } + } + grpc_endpoint* GetEndpoint() override { return nullptr; } + + void StartCall(CallHandler call_handler) override { + test_->handlers_.push(std::move(call_handler)); + } + + private: + ConnectedSubchannelTest* const test_; + ConnectivityStateTracker state_tracker_{"test-transport"}; + }; + + class TestConnector final : public SubchannelConnector { + public: + explicit TestConnector(ConnectedSubchannelTest* test) : test_(test) {} + + void Connect(const Args& args, Result* result, + grpc_closure* notify) override { + result->channel_args = args.channel_args; + result->transport = MakeOrphanable(test_).release(); + ExecCtx::Run(DEBUG_LOCATION, notify, absl::OkStatus()); + } + + void Shutdown(grpc_error_handle error) override {} + + private: + ConnectedSubchannelTest* const test_; + }; + + ChannelArgs CompleteArgs(const ChannelArgs& args) { + return args.SetObject(ResourceQuota::Default()) + .SetObject(std::static_pointer_cast(event_engine())) + .SetObject(MakeRefCounted()) + .Set(GRPC_ARG_DEFAULT_AUTHORITY, kDefaultAuthority); + } + + void InitCoreConfiguration() override {} + + void Shutdown() override {} + + absl::optional PopHandler() { + if (handlers_.empty()) return absl::nullopt; + auto handler = std::move(handlers_.front()); + handlers_.pop(); + return handler; + } + + std::queue handlers_; + RefCountedPtr call_arena_allocator_ = + MakeRefCounted( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( + "test"), + 1024); +}; + +#define CONNECTED_SUBCHANNEL_CHANNEL_TEST(name) \ + YODEL_TEST(ConnectedSubchannelTest, name) + +CONNECTED_SUBCHANNEL_CHANNEL_TEST(NoOp) { InitChannel(ChannelArgs()); } + +CONNECTED_SUBCHANNEL_CHANNEL_TEST(StartCall) { + auto channel = InitChannel(ChannelArgs()); + auto call = MakeCall(MakeClientInitialMetadata()); + SpawnTestSeq( + call.handler, "start-call", [channel, handler = call.handler]() mutable { + channel->unstarted_call_destination()->StartCall(std::move(handler)); + return Empty{}; + }); + auto handler = TickUntilCallStarted(); + WaitForAllPendingWork(); +} + +} // namespace grpc_core