Call: Send cancel op down the stack even when no ops are sent (#30004)

* Call: Send cancel op down the stack even when no ops are sent

* Add test
pull/29869/head
Yash Tibrewal 2 years ago committed by GitHub
parent a1e1d34f36
commit 78d6d71af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      src/core/lib/surface/call.cc
  2. 25
      test/cpp/end2end/client_interceptors_end2end_test.cc
  3. 3
      test/cpp/end2end/interceptors_util.cc

@ -367,7 +367,6 @@ class FilterStackCall final : public Call {
bool received_initial_metadata_ = false;
bool receiving_message_ = false;
bool requested_final_op_ = false;
gpr_atm any_ops_sent_atm_ = 0;
gpr_atm received_final_op_atm_ = 0;
BatchControl* active_batches_[kMaxConcurrentBatches] = {};
@ -707,8 +706,7 @@ void FilterStackCall::ExternalUnref() {
GPR_ASSERT(!destroy_called_);
destroy_called_ = true;
bool cancel = gpr_atm_acq_load(&any_ops_sent_atm_) != 0 &&
gpr_atm_acq_load(&received_final_op_atm_) == 0;
bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0;
if (cancel) {
CancelWithError(GRPC_ERROR_CANCELLED);
} else {
@ -1773,7 +1771,6 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
stream_op->on_complete = &bctl->finish_batch_;
}
gpr_atm_rel_store(&any_ops_sent_atm_, 1);
ExecuteBatch(stream_op, &bctl->start_batch_);
done:

@ -983,6 +983,26 @@ TEST_F(ClientInterceptorsCallbackEnd2endTest,
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
}
TEST_F(ClientInterceptorsCallbackEnd2endTest,
ClientInterceptorHijackingTestWithCallback) {
ChannelArguments args;
PhonyInterceptor::Reset();
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
creators;
creators.push_back(absl::make_unique<LoggingInterceptorFactory>());
// Add 20 phony interceptors
for (auto i = 0; i < 20; i++) {
creators.push_back(absl::make_unique<PhonyInterceptorFactory>());
}
creators.push_back(absl::make_unique<HijackingInterceptorFactory>());
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
MakeCallbackCall(channel);
LoggingInterceptor::VerifyUnaryCall();
// Make sure all 20 phony interceptors were run
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 20);
}
TEST_F(ClientInterceptorsCallbackEnd2endTest,
ClientInterceptorFactoryAllowsNullptrReturn) {
ChannelArguments args;
@ -1240,5 +1260,8 @@ TEST_F(ClientGlobalInterceptorEnd2endTest, HijackingGlobalInterceptor) {
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
int ret = RUN_ALL_TESTS();
// Make sure that gRPC shuts down cleanly
GPR_ASSERT(grpc_wait_until_shutdown(1));
return ret;
}

@ -20,6 +20,8 @@
#include "absl/memory/memory.h"
#include "test/core/util/test_config.h"
namespace grpc {
namespace testing {
@ -157,6 +159,7 @@ void MakeAsyncCQBidiStreamingCall(const std::shared_ptr<Channel>& /*channel*/) {
void MakeCallbackCall(const std::shared_ptr<Channel>& channel) {
auto stub = grpc::testing::EchoTestService::NewStub(channel);
ClientContext ctx;
ctx.set_deadline(grpc_timeout_milliseconds_to_deadline(20000));
EchoRequest req;
std::mutex mu;
std::condition_variable cv;

Loading…
Cancel
Save