Merge pull request #5572 from vjpai/srv_ctx

Properly integrate async API with server-side cancellations.
pull/5599/head
Sree Kuchibhotla 9 years ago
commit 22e53cbac6
  1. 1
      include/grpc++/impl/codegen/completion_queue.h
  2. 3
      include/grpc++/impl/codegen/server_context.h
  3. 26
      src/cpp/server/server_context.cc
  4. 232
      test/cpp/end2end/async_end2end_test.cc

@ -184,6 +184,7 @@ class CompletionQueue : private GrpcLibrary {
bool Pluck(CompletionQueueTag* tag); bool Pluck(CompletionQueueTag* tag);
/// Performs a single polling pluck on \a tag. /// Performs a single polling pluck on \a tag.
/// \warning Must not be mixed with calls to \a Next.
void TryPluck(CompletionQueueTag* tag); void TryPluck(CompletionQueueTag* tag);
grpc_completion_queue* cq_; // owned grpc_completion_queue* cq_; // owned

@ -103,6 +103,9 @@ class ServerContext {
void AddInitialMetadata(const grpc::string& key, const grpc::string& value); void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
// IsCancelled is always safe to call when using sync API
// When using async API, it is only safe to call IsCancelled after
// the AsyncNotifyWhenDone tag has been delivered
bool IsCancelled() const; bool IsCancelled() const;
// Cancel the Call from the server. This is a best-effort API and depending on // Cancel the Call from the server. This is a best-effort API and depending on

@ -62,7 +62,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq); bool CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
return CheckCancelledNoPluck();
}
bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
void set_tag(void* tag) { void set_tag(void* tag) {
has_tag_ = true; has_tag_ = true;
@ -72,6 +76,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
void Unref(); void Unref();
private: private:
bool CheckCancelledNoPluck() {
grpc::lock_guard<grpc::mutex> g(mu_);
return finalized_ ? (cancelled_ != 0) : false;
}
bool has_tag_; bool has_tag_;
void* tag_; void* tag_;
grpc::mutex mu_; grpc::mutex mu_;
@ -88,12 +97,6 @@ void ServerContext::CompletionOp::Unref() {
} }
} }
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
return finalized_ ? cancelled_ != 0 : false;
}
void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_; ops->data.recv_close_on_server.cancelled = &cancelled_;
@ -182,7 +185,14 @@ void ServerContext::TryCancel() const {
} }
bool ServerContext::IsCancelled() const { bool ServerContext::IsCancelled() const {
return completion_op_ && completion_op_->CheckCancelled(cq_); if (has_notify_when_done_tag_) {
// when using async API, but the result is only valid
// if the tag has already been delivered at the completion queue
return completion_op_ && completion_op_->CheckCancelledAsync();
} else {
// when using sync API
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
} }
void ServerContext::set_compression_level(grpc_compression_level level) { void ServerContext::set_compression_level(grpc_compression_level level) {

@ -68,6 +68,7 @@ namespace testing {
namespace { namespace {
void* tag(int i) { return (void*)(intptr_t)i; } void* tag(int i) { return (void*)(intptr_t)i; }
int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
#ifdef GPR_POSIX_SOCKET #ifdef GPR_POSIX_SOCKET
static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, static int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
@ -106,37 +107,50 @@ class PollingOverrider {
class Verifier { class Verifier {
public: public:
explicit Verifier(bool spin) : spin_(spin) {} explicit Verifier(bool spin) : spin_(spin) {}
// Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) { Verifier& Expect(int i, bool expect_ok) {
expectations_[tag(i)] = expect_ok; expectations_[tag(i)] = expect_ok;
return *this; return *this;
} }
// Next waits for 1 async tag to complete, checks its
// expectations, and returns the tag
int Next(CompletionQueue* cq, bool ignore_ok) {
bool ok;
void* got_tag;
if (spin_) {
for (;;) {
auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_TRUE(cq->Next(&got_tag, &ok));
}
auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end());
if (!ignore_ok) {
EXPECT_EQ(it->second, ok);
}
expectations_.erase(it);
return detag(got_tag);
}
// Verify keeps calling Next until all currently set
// expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); } void Verify(CompletionQueue* cq) { Verify(cq, false); }
// This version of Verify allows optionally ignoring the
// outcome of the expectation
void Verify(CompletionQueue* cq, bool ignore_ok) { void Verify(CompletionQueue* cq, bool ignore_ok) {
GPR_ASSERT(!expectations_.empty()); GPR_ASSERT(!expectations_.empty());
while (!expectations_.empty()) { while (!expectations_.empty()) {
bool ok; Next(cq, ignore_ok);
void* got_tag;
if (spin_) {
for (;;) {
auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_TRUE(cq->Next(&got_tag, &ok));
}
auto it = expectations_.find(got_tag);
EXPECT_TRUE(it != expectations_.end());
if (!ignore_ok) {
EXPECT_EQ(it->second, ok);
}
expectations_.erase(it);
} }
} }
// This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq, void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) { std::chrono::system_clock::time_point deadline) {
if (expectations_.empty()) { if (expectations_.empty()) {
@ -793,7 +807,8 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
} }
// This class is for testing scenarios where RPCs are cancelled on the server // This class is for testing scenarios where RPCs are cancelled on the server
// by calling ServerContext::TryCancel() // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
// API to check for cancellation
class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
protected: protected:
typedef enum { typedef enum {
@ -803,13 +818,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
CANCEL_AFTER_PROCESSING CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase; } ServerTryCancelRequestPhase;
void ServerTryCancel(ServerContext* context) {
EXPECT_FALSE(context->IsCancelled());
context->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancel()");
EXPECT_TRUE(context->IsCancelled());
}
// Helper for testing client-streaming RPCs which are cancelled on the server. // Helper for testing client-streaming RPCs which are cancelled on the server.
// Depending on the value of server_try_cancel parameter, this will test one // Depending on the value of server_try_cancel parameter, this will test one
// of the following three scenarios: // of the following three scenarios:
@ -843,6 +851,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls // On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client // and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@ -858,9 +867,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_server_cq_result = true; bool expected_server_cq_result = true;
bool ignore_cq_result = false; bool ignore_cq_result = false;
bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(&srv_ctx); srv_ctx.TryCancel();
Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know // Since cancellation is done before server reads any results, we know
// for sure that all cq results will return false from this point forward // for sure that all cq results will return false from this point forward
@ -868,22 +880,39 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
} }
std::thread* server_try_cancel_thd = NULL; std::thread* server_try_cancel_thd = NULL;
auto verif = Verifier(GetParam());
if (server_try_cancel == CANCEL_DURING_PROCESSING) { if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = new std::thread( server_try_cancel_thd =
&AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); new std::thread(&ServerContext::TryCancel, &srv_ctx);
// Server will cancel the RPC in a parallel thread while reading the // Server will cancel the RPC in a parallel thread while reading the
// requests from the client. Since the cancellation can happen at anytime, // requests from the client. Since the cancellation can happen at anytime,
// some of the cq results (i.e those until cancellation) might be true but // some of the cq results (i.e those until cancellation) might be true but
// its non deterministic. So better to ignore the cq results // its non deterministic. So better to ignore the cq results
ignore_cq_result = true; ignore_cq_result = true;
// Expect that we might possibly see the done tag that
// indicates cancellation completion in this case
want_done_tag = true;
verif.Expect(11, true);
} }
// Server reads 3 messages (tags 6, 7 and 8) // Server reads 3 messages (tags 6, 7 and 8)
// But if want_done_tag is true, we might also see tag 11
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
srv_stream.Read(&recv_request, tag(tag_idx)); srv_stream.Read(&recv_request, tag(tag_idx));
Verifier(GetParam()) // Note that we'll add something to the verifier and verify that
.Expect(tag_idx, expected_server_cq_result) // something was seen, but it might be tag 11 and not what we
.Verify(cq_.get(), ignore_cq_result); // just added
int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
}
} }
if (server_try_cancel_thd != NULL) { if (server_try_cancel_thd != NULL) {
@ -892,7 +921,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
} }
if (server_try_cancel == CANCEL_AFTER_PROCESSING) { if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx); srv_ctx.TryCancel();
want_done_tag = true;
verif.Expect(11, true);
}
if (want_done_tag) {
verif.Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
} }
// The RPC has been cancelled at this point for sure (i.e irrespective of // The RPC has been cancelled at this point for sure (i.e irrespective of
@ -945,6 +982,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
Verifier(GetParam()).Expect(1, true).Verify(cq_.get()); Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and // On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client // receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@ -952,9 +990,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_cq_result = true; bool expected_cq_result = true;
bool ignore_cq_result = false; bool ignore_cq_result = false;
bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(&srv_ctx); srv_ctx.TryCancel();
Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point // We know for sure that all cq results will be false from this point
// since the server cancelled the RPC // since the server cancelled the RPC
@ -962,24 +1003,41 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
} }
std::thread* server_try_cancel_thd = NULL; std::thread* server_try_cancel_thd = NULL;
auto verif = Verifier(GetParam());
if (server_try_cancel == CANCEL_DURING_PROCESSING) { if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = new std::thread( server_try_cancel_thd =
&AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); new std::thread(&ServerContext::TryCancel, &srv_ctx);
// Server will cancel the RPC in a parallel thread while writing responses // Server will cancel the RPC in a parallel thread while writing responses
// to the client. Since the cancellation can happen at anytime, some of // to the client. Since the cancellation can happen at anytime, some of
// the cq results (i.e those until cancellation) might be true but it is // the cq results (i.e those until cancellation) might be true but it is
// non deterministic. So better to ignore the cq results // non deterministic. So better to ignore the cq results
ignore_cq_result = true; ignore_cq_result = true;
// Expect that we might possibly see the done tag that
// indicates cancellation completion in this case
want_done_tag = true;
verif.Expect(11, true);
} }
// Server sends three messages (tags 3, 4 and 5) // Server sends three messages (tags 3, 4 and 5)
// But if want_done tag is true, we might also see tag 11
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_response.set_message("Pong " + std::to_string(tag_idx)); send_response.set_message("Pong " + std::to_string(tag_idx));
srv_stream.Write(send_response, tag(tag_idx)); srv_stream.Write(send_response, tag(tag_idx));
Verifier(GetParam()) // Note that we'll add something to the verifier and verify that
.Expect(tag_idx, expected_cq_result) // something was seen, but it might be tag 11 and not what we
.Verify(cq_.get(), ignore_cq_result); // just added
int got_tag = verif.Expect(tag_idx, expected_cq_result)
.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
}
} }
if (server_try_cancel_thd != NULL) { if (server_try_cancel_thd != NULL) {
@ -988,13 +1046,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
} }
if (server_try_cancel == CANCEL_AFTER_PROCESSING) { if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx); srv_ctx.TryCancel();
want_done_tag = true;
verif.Expect(11, true);
// Client reads may fail bacause it is notified that the stream is // Client reads may fail bacause it is notified that the stream is
// cancelled. // cancelled.
ignore_cq_result = true; ignore_cq_result = true;
} }
if (want_done_tag) {
verif.Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
}
// Client attemts to read the three messages from the server // Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx)); cli_stream->Read(&recv_response, tag(tag_idx));
@ -1052,6 +1118,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and // On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client // receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get()); Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
@ -1063,9 +1130,12 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool expected_cq_result = true; bool expected_cq_result = true;
bool ignore_cq_result = false; bool ignore_cq_result = false;
bool want_done_tag = false;
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
ServerTryCancel(&srv_ctx); srv_ctx.TryCancel();
Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point // We know for sure that all cq results will be false from this point
// since the server cancelled the RPC // since the server cancelled the RPC
@ -1073,42 +1143,84 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
} }
std::thread* server_try_cancel_thd = NULL; std::thread* server_try_cancel_thd = NULL;
auto verif = Verifier(GetParam());
if (server_try_cancel == CANCEL_DURING_PROCESSING) { if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = new std::thread( server_try_cancel_thd =
&AsyncEnd2endServerTryCancelTest::ServerTryCancel, this, &srv_ctx); new std::thread(&ServerContext::TryCancel, &srv_ctx);
// Since server is going to cancel the RPC in a parallel thread, some of // Since server is going to cancel the RPC in a parallel thread, some of
// the cq results (i.e those until the cancellation) might be true. Since // the cq results (i.e those until the cancellation) might be true. Since
// that number is non-deterministic, it is better to ignore the cq results // that number is non-deterministic, it is better to ignore the cq results
ignore_cq_result = true; ignore_cq_result = true;
// Expect that we might possibly see the done tag that
// indicates cancellation completion in this case
want_done_tag = true;
verif.Expect(11, true);
} }
int got_tag;
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam()) verif.Expect(4, expected_cq_result);
.Expect(4, expected_cq_result) got_tag = verif.Next(cq_.get(), ignore_cq_result);
.Verify(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4);
}
send_response.set_message("Pong"); send_response.set_message("Pong");
srv_stream.Write(send_response, tag(5)); srv_stream.Write(send_response, tag(5));
Verifier(GetParam()) verif.Expect(5, expected_cq_result);
.Expect(5, expected_cq_result) got_tag = verif.Next(cq_.get(), ignore_cq_result);
.Verify(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5);
}
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam()) verif.Expect(6, expected_cq_result);
.Expect(6, expected_cq_result) got_tag = verif.Next(cq_.get(), ignore_cq_result);
.Verify(cq_.get(), ignore_cq_result); GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6);
}
// This is expected to succeed in all cases // This is expected to succeed in all cases
cli_stream->WritesDone(tag(7)); cli_stream->WritesDone(tag(7));
Verifier(GetParam()).Expect(7, true).Verify(cq_.get()); verif.Expect(7, true);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 7);
}
// This is expected to fail in all cases i.e for all values of // This is expected to fail in all cases i.e for all values of
// server_try_cancel. This is because at this point, either there are no // server_try_cancel. This is because at this point, either there are no
// more msgs from the client (because client called WritesDone) or the RPC // more msgs from the client (because client called WritesDone) or the RPC
// is cancelled on the server // is cancelled on the server
srv_stream.Read(&recv_request, tag(8)); srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam()).Expect(8, false).Verify(cq_.get()); verif.Expect(8, false);
got_tag = verif.Next(cq_.get(), ignore_cq_result);
GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
if (got_tag == 11) {
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
// Now get the other entry that we were waiting on
EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
}
if (server_try_cancel_thd != NULL) { if (server_try_cancel_thd != NULL) {
server_try_cancel_thd->join(); server_try_cancel_thd->join();
@ -1116,7 +1228,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
} }
if (server_try_cancel == CANCEL_AFTER_PROCESSING) { if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
ServerTryCancel(&srv_ctx); srv_ctx.TryCancel();
want_done_tag = true;
verif.Expect(11, true);
}
if (want_done_tag) {
verif.Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
want_done_tag = false;
} }
// The RPC has been cancelled at this point for sure (i.e irrespective of // The RPC has been cancelled at this point for sure (i.e irrespective of

Loading…
Cancel
Save