pull/36414/head
Craig Tiller 7 months ago
parent 803005600c
commit 4ea7e49c7c
  1. 2
      src/core/lib/channel/promise_based_filter.h
  2. 25
      src/core/lib/transport/interception_chain.cc
  3. 2
      src/core/lib/transport/interception_chain.h
  4. 2
      test/core/filters/filter_test.h
  5. 2
      test/core/filters/filter_test_test.cc
  6. 16
      test/cpp/end2end/client_interceptors_end2end_test.cc
  7. 4
      test/cpp/end2end/interceptors_util.cc
  8. 4
      test/cpp/end2end/interceptors_util.h
  9. 2
      test/cpp/end2end/server_interceptors_end2end_test.cc

@ -94,7 +94,7 @@ class ChannelFilter {
// of constructing this object without naming it ===> implicit construction.
// TODO(ctiller): remove this once we're fully on call-v3
// NOLINTNEXTLINE(google-explicit-constructor)
explicit Args(size_t instance_id) : impl_(V3Based{instance_id}) {}
Args(size_t instance_id) : impl_(V3Based{instance_id}) {}
ABSL_DEPRECATED("Direct access to channel stack is deprecated")
grpc_channel_stack* channel_stack() const {

@ -103,22 +103,21 @@ class TerminalInterceptor final : public UnstartedCallDestination {
// InterceptionChain::Builder
void InterceptionChainBuilder::AddInterceptor(
absl::StatusOr<RefCountedPtr<Interceptor>> maybe_interceptor) {
absl::StatusOr<RefCountedPtr<Interceptor>> interceptor) {
if (!status_.ok()) return;
if (!maybe_interceptor.ok()) {
status_ = maybe_interceptor.status();
if (!interceptor.ok()) {
status_ = interceptor.status();
return;
}
auto interceptor = std::move(maybe_interceptor.value());
interceptor->filter_stack_ = MakeFilterStack();
(*interceptor)->filter_stack_ = MakeFilterStack();
if (top_interceptor_ == nullptr) {
top_interceptor_ = std::move(interceptor);
top_interceptor_ = std::move(*interceptor);
} else {
Interceptor* previous = top_interceptor_.get();
while (previous->wrapped_destination_ != nullptr) {
previous = DownCast<Interceptor*>(previous->wrapped_destination_.get());
}
previous->wrapped_destination_ = std::move(interceptor);
previous->wrapped_destination_ = std::move(*interceptor);
}
}
@ -128,32 +127,30 @@ InterceptionChainBuilder::Build(FinalDestination final_destination) {
// Build the final UnstartedCallDestination in the chain - what we do here
// depends on both the type of the final destination and the filters we have
// that haven't been captured into an Interceptor yet.
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> terminator = Match(
RefCountedPtr<UnstartedCallDestination> terminator = Match(
final_destination,
[this](RefCountedPtr<UnstartedCallDestination> final_destination)
-> absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> {
-> RefCountedPtr<UnstartedCallDestination> {
if (stack_builder_.has_value()) {
// TODO(ctiller): consider interjecting a hijacker here
return MakeRefCounted<TerminalInterceptor>(MakeFilterStack(),
final_destination);
}
return final_destination;
},
[this](RefCountedPtr<CallDestination> final_destination)
-> absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> {
-> RefCountedPtr<UnstartedCallDestination> {
return MakeRefCounted<CallStarter>(MakeFilterStack(),
std::move(final_destination));
});
if (!terminator.ok()) return terminator.status();
// Now append the terminator to the interceptor chain.
if (top_interceptor_ == nullptr) {
return std::move(terminator.value());
return std::move(terminator);
}
Interceptor* previous = top_interceptor_.get();
while (previous->wrapped_destination_ != nullptr) {
previous = DownCast<Interceptor*>(previous->wrapped_destination_.get());
}
previous->wrapped_destination_ = std::move(terminator.value());
previous->wrapped_destination_ = std::move(terminator);
return std::move(top_interceptor_);
}

@ -34,7 +34,7 @@ class InterceptionChainBuilder;
// call object above us, the processed metadata from any filters/interceptors
// above us, and also create new CallInterceptor objects that will be handled
// below.
class HijackedCall {
class HijackedCall final {
public:
HijackedCall(ClientMetadataHandle metadata,
RefCountedPtr<UnstartedCallDestination> destination,

@ -114,7 +114,7 @@ class FilterTestBase : public ::testing::Test {
impl_->initial_arena_size = size;
}
Call MakeCallPair();
Call MakeCall();
protected:
explicit Channel(std::unique_ptr<ChannelFilter> filter,

@ -132,7 +132,7 @@ using AddServerInitialMetadataFilterTest =
TEST_F(NoOpFilterTest, NoOp) {}
TEST_F(NoOpFilterTest, MakeCallPair) {
TEST_F(NoOpFilterTest, MakeCall) {
Call call(MakeChannel(ChannelArgs()).value());
}

@ -798,7 +798,7 @@ class ParameterizedClientInterceptorsEnd2endTest
void SendRPC(const std::shared_ptr<Channel>& channel) {
switch (GetParam().rpc_type()) {
case RPCType::kSyncUnary:
MakeCallPair(channel);
MakeCall(channel);
break;
case RPCType::kSyncClientStreaming:
MakeClientStreamingCall(channel);
@ -880,7 +880,7 @@ TEST_F(ClientInterceptorsEnd2endTest,
creators.push_back(std::make_unique<HijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, nullptr, args, std::move(creators));
MakeCallPair(channel);
MakeCall(channel);
}
TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) {
@ -900,7 +900,7 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorHijackingTest) {
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeCallPair(channel);
MakeCall(channel);
// Make sure only 20 phony interceptors were run
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
}
@ -913,7 +913,7 @@ TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLogThenHijackTest) {
creators.push_back(std::make_unique<HijackingInterceptorFactory>());
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeCallPair(channel);
MakeCall(channel);
LoggingInterceptor::VerifyUnaryCall();
}
@ -938,7 +938,7 @@ TEST_F(ClientInterceptorsEnd2endTest,
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
MakeCallPair(channel, StubOptions("TestSuffixForStats"));
MakeCall(channel, StubOptions("TestSuffixForStats"));
// Make sure all interceptors were run once, since the hijacking interceptor
// makes an RPC on the intercepted channel
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 12);
@ -1199,7 +1199,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, PhonyGlobalInterceptor) {
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeCallPair(channel);
MakeCall(channel);
// Make sure all 20 phony interceptors were run with the global interceptor
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 21);
experimental::TestOnlyResetGlobalClientInterceptorFactory();
@ -1222,7 +1222,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, LoggingGlobalInterceptor) {
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeCallPair(channel);
MakeCall(channel);
LoggingInterceptor::VerifyUnaryCall();
// Make sure all 20 phony interceptors were run
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
@ -1246,7 +1246,7 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, HijackingGlobalInterceptor) {
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
MakeCallPair(channel);
MakeCall(channel);
// Make sure all 20 phony interceptors were run
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
experimental::TestOnlyResetGlobalClientInterceptorFactory();

@ -30,8 +30,8 @@ std::atomic<int> PhonyInterceptor::num_times_run_;
std::atomic<int> PhonyInterceptor::num_times_run_reverse_;
std::atomic<int> PhonyInterceptor::num_times_cancel_;
void MakeCallPair(const std::shared_ptr<Channel>& channel,
const StubOptions& options) {
void MakeCall(const std::shared_ptr<Channel>& channel,
const StubOptions& options) {
auto stub = grpc::testing::EchoTestService::NewStub(channel, options);
ClientContext ctx;
EchoRequest req;

@ -207,8 +207,8 @@ class EchoTestServiceStreamingImpl : public EchoTestService::Service {
constexpr int kNumStreamingMessages = 10;
void MakeCallPair(const std::shared_ptr<Channel>& channel,
const StubOptions& options = StubOptions());
void MakeCall(const std::shared_ptr<Channel>& channel,
const StubOptions& options = StubOptions());
void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel);

@ -265,7 +265,7 @@ TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
PhonyInterceptor::Reset();
auto channel =
grpc::CreateChannel(server_address_, InsecureChannelCredentials());
MakeCallPair(channel);
MakeCall(channel);
// Make sure all 20 phony interceptors were run
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
}

Loading…
Cancel
Save