pull/36732/head
Craig Tiller 11 months ago
parent 2edc6177c7
commit ac23dd62af
  1. 8
      src/core/client_channel/client_channel.cc
  2. 4
      src/core/lib/gprpp/work_serializer.cc
  3. 126
      test/core/client_channel/client_channel_test.cc

@ -539,6 +539,10 @@ absl::StatusOr<OrphanablePtr<Channel>> ClientChannel::Create(
return absl::InternalError(
"Missing call destination factory in args for client channel");
}
if (channel_args.GetObject<EventEngine>() == nullptr) {
return absl::InternalError(
"Missing event engine in args for client channel");
}
// Success. Construct channel.
return MakeOrphanable<ClientChannel>(
std::move(target), std::move(channel_args), std::move(uri_to_resolve),
@ -567,7 +571,7 @@ ClientChannel::ClientChannel(
CallDestinationFactory* call_destination_factory)
: Channel(std::move(target), channel_args),
channel_args_(std::move(channel_args)),
event_engine_(channel_args.GetObjectRef<EventEngine>()),
event_engine_(channel_args_.GetObjectRef<EventEngine>()),
uri_to_resolve_(std::move(uri_to_resolve)),
service_config_parser_index_(
internal::ClientChannelServiceConfigParser::ParserIndex()),
@ -779,7 +783,7 @@ CallInitiator ClientChannel::CreateCall(
return got_result;
})),
// Handle resolver result.
[self, &unstarted_handler](
[self, unstarted_handler](
std::tuple<absl::StatusOr<ResolverDataForCalls>, bool>
result_and_delayed) mutable {
auto& resolver_data = std::get<0>(result_and_delayed);

@ -301,7 +301,9 @@ class WorkSerializer::DispatchingWorkSerializer final
explicit DispatchingWorkSerializer(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine)
: event_engine_(std::move(event_engine)) {}
: event_engine_(std::move(event_engine)) {
CHECK(event_engine_ != nullptr);
}
void Run(std::function<void()> callback,
const DebugLocation& location) override;
void Schedule(std::function<void()> callback,

@ -14,18 +14,29 @@
#include "src/core/client_channel/client_channel.h"
#include <atomic>
#include <memory>
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"
#include <grpc/grpc.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/config/core_configuration.h"
#include "test/core/call/yodel/yodel_test.h"
namespace grpc_core {
using EventEngine = grpc_event_engine::experimental::EventEngine;
namespace {
const absl::string_view kTestTarget = "test:///target";
const absl::string_view kTestScheme = "test";
const absl::string_view kTestTarget = "/target";
const absl::string_view kTestPath = "/test_method";
std::string TestTarget() {
return absl::StrCat(kTestScheme, "://", kTestTarget);
}
} // namespace
class ClientChannelTest : public YodelTest {
@ -34,8 +45,7 @@ class ClientChannelTest : public YodelTest {
using YodelTest::YodelTest;
ClientChannel& InitChannel(const ChannelArgs& args) {
auto channel =
ClientChannel::Create(std::string(kTestTarget), CompleteArgs(args));
auto channel = ClientChannel::Create(TestTarget(), CompleteArgs(args));
CHECK_OK(channel);
channel_ = OrphanablePtr<ClientChannel>(
DownCast<ClientChannel*>(channel->release()));
@ -60,12 +70,46 @@ class ClientChannelTest : public YodelTest {
return TickUntil(absl::FunctionRef<Poll<CallHandler>()>(poll));
}
void QueueNameResolutionResult(Resolver::Result result) {
if (resolver_ != nullptr) {
resolver_->QueueNameResolutionResult(std::move(result));
} else {
early_resolver_results_.push(std::move(result));
}
}
Resolver::Result MakeSuccessfulResolutionResult(
absl::string_view endpoint_address) {
Resolver::Result result;
grpc_resolved_address address;
CHECK(grpc_parse_uri(URI::Parse(endpoint_address).value(), &address));
result.addresses = EndpointAddressesList({EndpointAddresses{address, {}}});
return result;
}
private:
class TestConnector final : public SubchannelConnector {
public:
void Connect(const Args& args, Result* result,
grpc_closure* notify) override {
CHECK_EQ(notify_, nullptr);
notify_ = notify;
}
void Shutdown(grpc_error_handle error) override {
if (notify_ != nullptr) ExecCtx::Run(DEBUG_LOCATION, notify_, error);
}
private:
grpc_closure* notify_ = nullptr;
};
class TestClientChannelFactory final : public ClientChannelFactory {
public:
RefCountedPtr<Subchannel> CreateSubchannel(
const grpc_resolved_address& address, const ChannelArgs& args) {
Crash("unimplemented");
gpr_log(GPR_INFO, "CreateSubchannel: args=%s", args.ToString().c_str());
return Subchannel::Create(MakeOrphanable<TestConnector>(), address, args);
}
};
@ -105,19 +149,77 @@ class ClientChannelTest : public YodelTest {
ClientChannelTest* const test_;
};
class TestResolver final : public Resolver {
public:
explicit TestResolver(
ClientChannelTest* test, ChannelArgs args,
std::unique_ptr<Resolver::ResultHandler> result_handler,
std::shared_ptr<WorkSerializer> work_serializer)
: test_(test),
args_(std::move(args)),
result_handler_(std::move(result_handler)),
work_serializer_(std::move(work_serializer)) {}
void StartLocked() override {
while (!test_->early_resolver_results_.empty()) {
QueueNameResolutionResult(
std::move(test_->early_resolver_results_.front()));
test_->early_resolver_results_.pop();
}
}
void ShutdownLocked() override {}
void QueueNameResolutionResult(Resolver::Result result) {
result.args = result.args.UnionWith(args_);
work_serializer_->Run(
[self = RefAsSubclass<TestResolver>(),
result = std::move(result)]() mutable {
self->result_handler_->ReportResult(std::move(result));
},
DEBUG_LOCATION);
}
private:
ClientChannelTest* const test_;
const ChannelArgs args_;
const std::unique_ptr<Resolver::ResultHandler> result_handler_;
const std::shared_ptr<WorkSerializer> work_serializer_;
};
class TestResolverFactory final : public ResolverFactory {
public:
explicit TestResolverFactory(ClientChannelTest* test) : test_(test) {}
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
CHECK_EQ(args.uri.scheme(), kTestScheme);
CHECK_EQ(args.uri.path(), kTestTarget);
return MakeOrphanable<TestResolver>(test_, std::move(args.args),
std::move(args.result_handler),
std::move(args.work_serializer));
}
absl::string_view scheme() const override { return "test"; }
bool IsValidUri(const URI& uri) const override { return true; }
private:
ClientChannelTest* const test_;
};
ChannelArgs CompleteArgs(const ChannelArgs& args) {
return args.SetObject(&call_destination_factory_)
.SetObject(&client_channel_factory_)
.SetObject(ResourceQuota::Default())
.SetObject(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine()));
.SetObject(std::static_pointer_cast<EventEngine>(event_engine()))
// TODO(ctiller): remove once v3 supports retries?
.SetIfUnset(GRPC_ARG_ENABLE_RETRIES, 0);
}
void InitCoreConfiguration() override {
CoreConfiguration::RegisterBuilder(
);
[this](CoreConfiguration::Builder* builder) {
builder->resolver_registry()->RegisterResolverFactory(
std::make_unique<TestResolverFactory>(this));
});
}
void Shutdown() override {
@ -131,6 +233,10 @@ class ClientChannelTest : public YodelTest {
TestClientChannelFactory client_channel_factory_;
RefCountedPtr<TestCallDestination> call_destination_ =
MakeRefCounted<TestCallDestination>();
// Resolver results that have been reported before the resolver has been
// instantiated.
std::queue<Resolver::Result> early_resolver_results_;
TestResolver* resolver_ = nullptr;
};
#define CLIENT_CHANNEL_TEST(name) YODEL_TEST(ClientChannelTest, name)
@ -145,6 +251,8 @@ CLIENT_CHANNEL_TEST(CreateCall) {
CLIENT_CHANNEL_TEST(StartCall) {
auto& channel = InitChannel(ChannelArgs());
auto call_initiator = channel.CreateCall(MakeClientInitialMetadata());
QueueNameResolutionResult(
MakeSuccessfulResolutionResult("ipv4:127.0.0.1:1234"));
auto call_handler = TickUntilCallStarted();
}

Loading…
Cancel
Save