Merge branch 'transport-refs-7' into transport-refs-7.5

pull/36732/head
Craig Tiller 10 months ago
commit a2227b3f99
  1. 1
      BUILD
  2. 2
      src/core/client_channel/client_channel.h
  3. 80
      src/core/client_channel/subchannel.cc
  4. 6
      src/core/client_channel/subchannel.h
  5. 5
      test/core/call/yodel/yodel_test.cc
  6. 21
      test/core/client_channel/BUILD
  7. 5
      test/core/client_channel/client_channel_test.cc
  8. 179
      test/core/client_channel/connected_subchannel_test.cc

@ -3798,6 +3798,7 @@ grpc_cc_library(
"//src/core:grpc_service_config",
"//src/core:idle_filter_state",
"//src/core:init_internally",
"//src/core:interception_chain",
"//src/core:iomgr_fwd",
"//src/core:json",
"//src/core:latch",

@ -71,7 +71,7 @@ class ClientChannel : public Channel {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
RefCountedPtr<UnstartedCallDestination> call_destination() override {
return subchannel_->connected_subchannel();
return subchannel_->connected_subchannel()->unstarted_call_destination();
}
void RequestConnection() override { subchannel_->RequestConnection(); }

@ -55,6 +55,8 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
@ -68,6 +70,7 @@
#include "src/core/lib/surface/init_internally.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/interception_chain.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/util/alloc.h"
#include "src/core/util/useful.h"
@ -101,7 +104,7 @@ DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
ConnectedSubchannel::ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: UnstartedCallDestination(
: RefCounted<ConnectedSubchannel>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
? "ConnectedSubchannel"
: nullptr),
@ -140,8 +143,9 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
Crash("call v3 ping method called in legacy impl");
}
void StartCall(UnstartedCallHandler) override {
Crash("call v3 StartCall() method called in legacy impl");
RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
const override {
Crash("call v3 unstarted_call_destination method called in legacy impl");
}
grpc_channel_stack* channel_stack() const override {
@ -163,8 +167,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
// handles the channelz updates.
return OnCancel(
Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)),
[self = RefAsSubclass<ConnectedSubchannel>()](
ServerMetadataHandle metadata) {
[self = Ref()](ServerMetadataHandle metadata) {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
@ -176,7 +179,7 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
}
return metadata;
}),
[self = RefAsSubclass<ConnectedSubchannel>()]() {
[self = Ref()]() {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
@ -203,12 +206,30 @@ class LegacyConnectedSubchannel : public ConnectedSubchannel {
class NewConnectedSubchannel : public ConnectedSubchannel {
public:
class TransportCallDestination final : public CallDestination {
public:
explicit TransportCallDestination(OrphanablePtr<ClientTransport> transport)
: transport_(std::move(transport)) {}
ClientTransport* transport() { return transport_.get(); }
void HandleCall(CallHandler handler) override {
transport_->StartCall(std::move(handler));
}
void Orphaned() override { transport_.reset(); }
private:
OrphanablePtr<ClientTransport> transport_;
};
NewConnectedSubchannel(
RefCountedPtr<CallFilters::Stack> filter_stack,
OrphanablePtr<Transport> transport, const ChannelArgs& args,
RefCountedPtr<UnstartedCallDestination> call_destination,
RefCountedPtr<TransportCallDestination> transport,
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
filter_stack_(std::move(filter_stack)),
call_destination_(std::move(call_destination)),
transport_(std::move(transport)) {}
void StartWatch(
@ -221,9 +242,9 @@ class NewConnectedSubchannel : public ConnectedSubchannel {
// FIXME: add new transport API for this in v3 stack
}
void StartCall(UnstartedCallHandler unstarted_handler) override {
auto handler = unstarted_handler.StartCall(filter_stack_);
transport_->client_transport()->StartCall(std::move(handler));
RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
const override {
return call_destination_;
}
grpc_channel_stack* channel_stack() const override { return nullptr; }
@ -240,8 +261,8 @@ class NewConnectedSubchannel : public ConnectedSubchannel {
}
private:
RefCountedPtr<CallFilters::Stack> filter_stack_;
OrphanablePtr<Transport> transport_;
RefCountedPtr<UnstartedCallDestination> call_destination_;
RefCountedPtr<TransportCallDestination> transport_;
};
//
@ -849,7 +870,7 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
bool Subchannel::PublishTransportLocked() {
auto socket_node = std::move(connecting_result_.socket_node);
if (!IsCallV3Enabled()) {
if (connecting_result_.transport->filter_stack_transport() != nullptr) {
// Construct channel stack.
// Builder takes ownership of transport.
ChannelStackBuilderImpl builder(
@ -870,15 +891,28 @@ bool Subchannel::PublishTransportLocked() {
connected_subchannel_ = MakeRefCounted<LegacyConnectedSubchannel>(
std::move(*stack), args_, channelz_node_);
} else {
// Call v3 stack.
CallFilters::StackBuilder builder;
// FIXME: add filters registered for CLIENT_SUBCHANNEL
auto filter_stack = builder.Build();
OrphanablePtr<ClientTransport> transport(
std::exchange(connecting_result_.transport, nullptr)
->client_transport());
InterceptionChainBuilder builder(
connecting_result_.channel_args.SetObject(transport.get()));
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_SUBCHANNEL, builder);
auto transport_destination =
MakeRefCounted<NewConnectedSubchannel::TransportCallDestination>(
std::move(transport));
auto call_destination = builder.Build(transport_destination);
if (!call_destination.ok()) {
connecting_result_.Reset();
gpr_log(GPR_ERROR,
"subchannel %p %s: error initializing subchannel stack: %s", this,
key_.ToString().c_str(),
call_destination.status().ToString().c_str());
return false;
}
connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
std::move(filter_stack),
OrphanablePtr<Transport>(
std::exchange(connecting_result_.transport, nullptr)),
args_, channelz_node_);
std::move(*call_destination), std::move(transport_destination), args_,
channelz_node_);
}
connecting_result_.Reset();
// Publish.

@ -64,7 +64,7 @@ namespace grpc_core {
class SubchannelCall;
class ConnectedSubchannel : public UnstartedCallDestination {
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
public:
const ChannelArgs& args() const { return args_; }
channelz::SubchannelNode* channelz_subchannel() const {
@ -77,6 +77,8 @@ class ConnectedSubchannel : public UnstartedCallDestination {
// Methods for v3 stack.
virtual void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) = 0;
virtual RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
const = 0;
// Methods for legacy stack.
virtual grpc_channel_stack* channel_stack() const = 0;
@ -85,8 +87,6 @@ class ConnectedSubchannel : public UnstartedCallDestination {
CallArgs call_args) = 0;
virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;
void Orphaned() override {}
protected:
ConnectedSubchannel(
const ChannelArgs& args,

@ -19,6 +19,7 @@
#include "absl/random/random.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/resource_quota/resource_quota.h"
@ -153,7 +154,10 @@ void YodelTest::RunTest() {
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test-allocator"),
1024);
{
ExecCtx exec_ctx;
TestImpl();
}
EXPECT_EQ(pending_actions_.size(), 0)
<< "There are still pending actions: did you forget to call "
"WaitForAllPendingWork()?";
@ -165,6 +169,7 @@ void YodelTest::RunTest() {
void YodelTest::TickUntilTrue(absl::FunctionRef<bool()> poll) {
WatchDog watchdog(this);
while (!poll()) {
ExecCtx exec_ctx;
state_->event_engine->Tick();
}
}

@ -55,6 +55,27 @@ grpc_yodel_test(
],
)
grpc_cc_library(
name = "connected_subchannel_test_lib",
testonly = True,
srcs = ["connected_subchannel_test.cc"],
external_deps = ["gtest"],
deps = [
"//:grpc_client_channel",
"//test/core/call/yodel:yodel_test",
],
alwayslink = 1,
)
grpc_yodel_test(
name = "connected_subchannel",
deps = [
":connected_subchannel_test_lib",
"//:grpc",
"//test/core/test_util:grpc_test_util",
],
)
grpc_cc_library(
name = "load_balanced_call_destination_test_lib",
testonly = True,

@ -62,12 +62,11 @@ class ClientChannelTest : public YodelTest {
}
CallHandler TickUntilCallStarted() {
auto poll = [this]() -> Poll<CallHandler> {
return TickUntil<CallHandler>([this]() -> Poll<CallHandler> {
auto handler = call_destination_->PopHandler();
if (handler.has_value()) return std::move(*handler);
return Pending();
};
return TickUntil(absl::FunctionRef<Poll<CallHandler>()>(poll));
});
}
void QueueNameResolutionResult(Resolver::Result result) {

@ -0,0 +1,179 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <atomic>
#include <memory>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/client_channel/client_channel.h"
#include "src/core/client_channel/local_subchannel_pool.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/config/core_configuration.h"
#include "test/core/call/yodel/yodel_test.h"
namespace grpc_core {
using EventEngine = grpc_event_engine::experimental::EventEngine;
namespace {
const absl::string_view kTestPath = "/test_method";
const absl::string_view kTestAddress = "ipv4:127.0.0.1:1234";
const absl::string_view kDefaultAuthority = "test-authority";
} // namespace
class ConnectedSubchannelTest : public YodelTest {
public:
protected:
using YodelTest::YodelTest;
RefCountedPtr<ConnectedSubchannel> InitChannel(const ChannelArgs& args) {
grpc_resolved_address addr;
CHECK(grpc_parse_uri(URI::Parse(kTestAddress).value(), &addr));
auto subchannel = Subchannel::Create(MakeOrphanable<TestConnector>(this),
addr, CompleteArgs(args));
{
ExecCtx exec_ctx;
subchannel->RequestConnection();
}
return TickUntil<RefCountedPtr<ConnectedSubchannel>>(
[subchannel]() -> Poll<RefCountedPtr<ConnectedSubchannel>> {
auto connected_subchannel = subchannel->connected_subchannel();
if (connected_subchannel != nullptr) return connected_subchannel;
return Pending();
});
}
ClientMetadataHandle MakeClientInitialMetadata() {
auto client_initial_metadata = Arena::MakePooled<ClientMetadata>();
client_initial_metadata->Set(HttpPathMetadata(),
Slice::FromCopiedString(kTestPath));
return client_initial_metadata;
}
CallInitiatorAndHandler MakeCall(
ClientMetadataHandle client_initial_metadata) {
return MakeCallPair(
std::move(client_initial_metadata), event_engine().get(),
call_arena_allocator_->MakeArena(), call_arena_allocator_, nullptr);
}
CallHandler TickUntilCallStarted() {
return TickUntil<CallHandler>([this]() -> Poll<CallHandler> {
auto handler = PopHandler();
if (handler.has_value()) return std::move(*handler);
return Pending();
});
}
private:
class TestTransport final : public ClientTransport {
public:
explicit TestTransport(ConnectedSubchannelTest* test) : test_(test) {}
void Orphan() override {
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(),
"transport-orphaned");
}
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
ServerTransport* server_transport() override { return nullptr; }
absl::string_view GetTransportName() const override { return "test"; }
void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override {}
void SetPollsetSet(grpc_stream* stream,
grpc_pollset_set* pollset_set) override {}
void PerformOp(grpc_transport_op* op) override {
LOG(INFO) << "PerformOp: " << grpc_transport_op_string(op);
if (op->start_connectivity_watch != nullptr) {
state_tracker_.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
}
grpc_endpoint* GetEndpoint() override { return nullptr; }
void StartCall(CallHandler call_handler) override {
test_->handlers_.push(std::move(call_handler));
}
private:
ConnectedSubchannelTest* const test_;
ConnectivityStateTracker state_tracker_{"test-transport"};
};
class TestConnector final : public SubchannelConnector {
public:
explicit TestConnector(ConnectedSubchannelTest* test) : test_(test) {}
void Connect(const Args& args, Result* result,
grpc_closure* notify) override {
result->channel_args = args.channel_args;
result->transport = MakeOrphanable<TestTransport>(test_).release();
ExecCtx::Run(DEBUG_LOCATION, notify, absl::OkStatus());
}
void Shutdown(grpc_error_handle error) override {}
private:
ConnectedSubchannelTest* const test_;
};
ChannelArgs CompleteArgs(const ChannelArgs& args) {
return args.SetObject(ResourceQuota::Default())
.SetObject(std::static_pointer_cast<EventEngine>(event_engine()))
.SetObject(MakeRefCounted<LocalSubchannelPool>())
.Set(GRPC_ARG_DEFAULT_AUTHORITY, kDefaultAuthority);
}
void InitCoreConfiguration() override {}
void Shutdown() override {}
absl::optional<CallHandler> PopHandler() {
if (handlers_.empty()) return absl::nullopt;
auto handler = std::move(handlers_.front());
handlers_.pop();
return handler;
}
std::queue<CallHandler> handlers_;
RefCountedPtr<CallArenaAllocator> call_arena_allocator_ =
MakeRefCounted<CallArenaAllocator>(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test"),
1024);
};
#define CONNECTED_SUBCHANNEL_CHANNEL_TEST(name) \
YODEL_TEST(ConnectedSubchannelTest, name)
CONNECTED_SUBCHANNEL_CHANNEL_TEST(NoOp) { InitChannel(ChannelArgs()); }
CONNECTED_SUBCHANNEL_CHANNEL_TEST(StartCall) {
auto channel = InitChannel(ChannelArgs());
auto call = MakeCall(MakeClientInitialMetadata());
SpawnTestSeq(
call.handler, "start-call", [channel, handler = call.handler]() mutable {
channel->unstarted_call_destination()->StartCall(std::move(handler));
return Empty{};
});
auto handler = TickUntilCallStarted();
WaitForAllPendingWork();
}
} // namespace grpc_core
Loading…
Cancel
Save