Remove spinning version

pull/14396/head
Vijay Pai 7 years ago
parent 98b6ec406b
commit 1e6a620b50
  1. 408
      test/cpp/end2end/async_end2end_test.cc

@ -61,7 +61,7 @@ int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
class Verifier { class Verifier {
public: public:
explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} Verifier() : lambda_run_(false) {}
// Expect sets the expected ok value for a specific tag // Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) { Verifier& Expect(int i, bool expect_ok) {
return ExpectUnless(i, expect_ok, false); return ExpectUnless(i, expect_ok, false);
@ -89,17 +89,7 @@ class Verifier {
int Next(CompletionQueue* cq, bool ignore_ok) { int Next(CompletionQueue* cq, bool ignore_ok) {
bool ok; bool ok;
void* got_tag; void* got_tag;
if (spin_) { EXPECT_TRUE(cq->Next(&got_tag, &ok));
for (;;) {
auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_TRUE(cq->Next(&got_tag, &ok));
}
GotTag(got_tag, ok, ignore_ok); GotTag(got_tag, ok, ignore_ok);
return detag(got_tag); return detag(got_tag);
} }
@ -135,34 +125,14 @@ class Verifier {
if (expectations_.empty()) { if (expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
if (spin_) { EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
while (std::chrono::system_clock::now() < deadline) { CompletionQueue::TIMEOUT);
EXPECT_EQ(
cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
CompletionQueue::TIMEOUT);
}
} else {
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
CompletionQueue::TIMEOUT);
}
} else { } else {
while (!expectations_.empty()) { while (!expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
if (spin_) { EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
for (;;) { CompletionQueue::GOT_EVENT);
GPR_ASSERT(std::chrono::system_clock::now() < deadline);
auto r =
cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
CompletionQueue::GOT_EVENT);
}
GotTag(got_tag, ok, false); GotTag(got_tag, ok, false);
} }
} }
@ -177,33 +147,14 @@ class Verifier {
if (expectations_.empty()) { if (expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
if (spin_) { EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
while (std::chrono::system_clock::now() < deadline) { CompletionQueue::TIMEOUT);
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
CompletionQueue::TIMEOUT);
}
} else {
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
CompletionQueue::TIMEOUT);
}
} else { } else {
while (!expectations_.empty()) { while (!expectations_.empty()) {
bool ok; bool ok;
void* got_tag; void* got_tag;
if (spin_) { EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
for (;;) { CompletionQueue::GOT_EVENT);
GPR_ASSERT(std::chrono::system_clock::now() < deadline);
auto r = DoOnceThenAsyncNext(
cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
if (r == CompletionQueue::TIMEOUT) continue;
if (r == CompletionQueue::GOT_EVENT) break;
gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
abort();
}
} else {
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
CompletionQueue::GOT_EVENT);
}
GotTag(got_tag, ok, false); GotTag(got_tag, ok, false);
} }
} }
@ -241,7 +192,6 @@ class Verifier {
std::map<void*, bool> expectations_; std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_; std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_;
bool lambda_run_; bool lambda_run_;
}; };
@ -267,15 +217,13 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
class TestScenario { class TestScenario {
public: public:
TestScenario(bool non_block, bool inproc_stub, const grpc::string& creds_type, TestScenario(bool inproc_stub, const grpc::string& creds_type, bool hcs,
bool hcs, const grpc::string& content) const grpc::string& content)
: disable_blocking(non_block), : inproc(inproc_stub),
inproc(inproc_stub),
health_check_service(hcs), health_check_service(hcs),
credentials_type(creds_type), credentials_type(creds_type),
message_content(content) {} message_content(content) {}
void Log() const; void Log() const;
bool disable_blocking;
bool inproc; bool inproc;
bool health_check_service; bool health_check_service;
const grpc::string credentials_type; const grpc::string credentials_type;
@ -284,9 +232,7 @@ class TestScenario {
static std::ostream& operator<<(std::ostream& out, static std::ostream& operator<<(std::ostream& out,
const TestScenario& scenario) { const TestScenario& scenario) {
return out << "TestScenario{disable_blocking=" return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
<< (scenario.disable_blocking ? "true" : "false")
<< ", inproc=" << (scenario.inproc ? "true" : "false")
<< ", credentials='" << scenario.credentials_type << ", credentials='" << scenario.credentials_type
<< ", health_check_service=" << ", health_check_service="
<< (scenario.health_check_service ? "true" : "false") << (scenario.health_check_service ? "true" : "false")
@ -374,16 +320,13 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -487,24 +430,20 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now()); std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10)); std::chrono::system_clock::now() + std::chrono::seconds(10));
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); Verifier().Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); Verifier().Verify(cq_.get(), time_now);
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
.Expect(2, true)
.Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(
.Expect(3, true) cq_.get(), std::chrono::system_clock::time_point::max());
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -532,8 +471,8 @@ TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
std::chrono::system_clock::now()); std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit( std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10)); std::chrono::system_clock::now() + std::chrono::seconds(10));
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); Verifier().Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); Verifier().Verify(cq_.get(), time_now);
auto resp_writer_ptr = &response_writer; auto resp_writer_ptr = &response_writer;
auto lambda_2 = [&, this, resp_writer_ptr]() { auto lambda_2 = [&, this, resp_writer_ptr]() {
@ -542,9 +481,7 @@ TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
cq_.get(), tag(2)); cq_.get(), tag(2));
}; };
Verifier(GetParam().disable_blocking) Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
.Expect(2, true)
.Verify(cq_.get(), time_limit, lambda_2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto recv_resp_ptr = &recv_response; auto recv_resp_ptr = &recv_response;
@ -554,11 +491,8 @@ TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
}; };
response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(
.Expect(3, true) cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
lambda_3);
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -584,41 +518,26 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
.Expect(2, true)
.Expect(1, true)
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3)); cli_stream->Write(send_request, tag(3));
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5)); cli_stream->Write(send_request, tag(5));
srv_stream.Read(&recv_request, tag(6)); srv_stream.Read(&recv_request, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7)); cli_stream->WritesDone(tag(7));
srv_stream.Read(&recv_request, tag(8)); srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam().disable_blocking) Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9)); srv_stream.Finish(send_response, Status::OK, tag(9));
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking) Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
.Expect(9, true)
.Expect(10, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -650,38 +569,26 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
bool seen3 = false; bool seen3 = false;
Verifier(GetParam().disable_blocking) Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
.Expect(2, true)
.ExpectMaybe(3, true, &seen3)
.Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking) Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
.ExpectUnless(3, true, seen3)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WriteLast(send_request, WriteOptions(), tag(5)); cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
srv_stream.Read(&recv_request, tag(6)); srv_stream.Read(&recv_request, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_stream.Read(&recv_request, tag(7)); srv_stream.Read(&recv_request, tag(7));
Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); Verifier().Expect(7, false).Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(8)); srv_stream.Finish(send_response, Status::OK, tag(8));
cli_stream->Finish(&recv_status, tag(9)); cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking) Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
.Expect(8, true)
.Expect(9, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -707,38 +614,26 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3)); srv_stream.Write(send_response, tag(3));
cli_stream->Read(&recv_response, tag(4)); cli_stream->Read(&recv_response, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5)); srv_stream.Write(send_response, tag(5));
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7)); srv_stream.Finish(Status::OK, tag(7));
cli_stream->Read(&recv_response, tag(8)); cli_stream->Read(&recv_response, tag(8));
Verifier(GetParam().disable_blocking) Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9)); cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); Verifier().Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
@ -763,34 +658,25 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3)); srv_stream.Write(send_response, tag(3));
cli_stream->Read(&recv_response, tag(4)); cli_stream->Read(&recv_response, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5)); srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->Read(&recv_response, tag(7)); cli_stream->Read(&recv_response, tag(7));
Verifier(GetParam().disable_blocking).Expect(7, false).Verify(cq_.get()); Verifier().Expect(7, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(8)); cli_stream->Finish(&recv_status, tag(8));
Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); Verifier().Expect(8, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
@ -815,36 +701,26 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3)); srv_stream.Write(send_response, tag(3));
cli_stream->Read(&recv_response, tag(4)); cli_stream->Read(&recv_response, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.WriteLast(send_response, WriteOptions(), tag(5)); srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
srv_stream.Finish(Status::OK, tag(7)); srv_stream.Finish(Status::OK, tag(7));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Expect(7, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->Read(&recv_response, tag(8)); cli_stream->Read(&recv_response, tag(8));
Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get()); Verifier().Expect(8, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9)); cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); Verifier().Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
@ -869,41 +745,26 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3)); cli_stream->Write(send_request, tag(3));
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5)); srv_stream.Write(send_response, tag(5));
cli_stream->Read(&recv_response, tag(6)); cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7)); cli_stream->WritesDone(tag(7));
srv_stream.Read(&recv_request, tag(8)); srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam().disable_blocking) Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9)); srv_stream.Finish(Status::OK, tag(9));
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking) Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
.Expect(9, true)
.Expect(10, true)
.Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
@ -933,33 +794,24 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
bool seen3 = false; bool seen3 = false;
Verifier(GetParam().disable_blocking) Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
.Expect(2, true)
.ExpectMaybe(3, true, &seen3)
.Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking) Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
.ExpectUnless(3, true, seen3)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_stream.Read(&recv_request, tag(5)); srv_stream.Read(&recv_request, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); Verifier().Expect(5, false).Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6)); srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
cli_stream->Read(&recv_response, tag(7)); cli_stream->Read(&recv_response, tag(7));
Verifier(GetParam().disable_blocking) Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
.Expect(6, true)
.Expect(7, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->Finish(&recv_status, tag(8)); cli_stream->Finish(&recv_status, tag(8));
Verifier(GetParam().disable_blocking).Expect(8, true).Verify(cq_.get()); Verifier().Expect(8, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
@ -989,35 +841,25 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
bool seen3 = false; bool seen3 = false;
Verifier(GetParam().disable_blocking) Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
.Expect(2, true)
.ExpectMaybe(3, true, &seen3)
.Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4)); srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking) Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
.ExpectUnless(3, true, seen3)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_stream.Read(&recv_request, tag(5)); srv_stream.Read(&recv_request, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, false).Verify(cq_.get()); Verifier().Expect(5, false).Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_stream.WriteLast(send_response, WriteOptions(), tag(6)); srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
srv_stream.Finish(Status::OK, tag(7)); srv_stream.Finish(Status::OK, tag(7));
cli_stream->Read(&recv_response, tag(8)); cli_stream->Read(&recv_response, tag(8));
Verifier(GetParam().disable_blocking) Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
.Expect(6, true)
.Expect(7, true)
.Expect(8, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->Finish(&recv_status, tag(9)); cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get()); Verifier().Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
@ -1049,7 +891,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, EXPECT_EQ(meta1.second,
@ -1063,10 +905,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -1094,15 +933,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); Verifier().Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4)); response_reader->ReadInitialMetadata(tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); Verifier().Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, EXPECT_EQ(meta1.second,
ToString(server_initial_metadata.find(meta1.first)->second)); ToString(server_initial_metadata.find(meta1.first)->second));
@ -1113,10 +952,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5)); response_writer.Finish(send_response, Status::OK, tag(5));
response_reader->Finish(&recv_response, &recv_status, tag(6)); response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -1144,10 +980,10 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); Verifier().Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
@ -1155,10 +991,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
response_writer.Finish(send_response, Status::OK, tag(4)); response_writer.Finish(send_response, Status::OK, tag(4));
response_reader->Finish(&recv_response, &recv_status, tag(5)); response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier(GetParam().disable_blocking) Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
.Expect(4, true)
.Expect(5, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -1207,7 +1040,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata(); auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, EXPECT_EQ(meta1.second,
@ -1219,9 +1052,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); Verifier().Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4)); response_reader->ReadInitialMetadata(tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); Verifier().Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, EXPECT_EQ(meta3.second,
ToString(server_initial_metadata.find(meta3.first)->second)); ToString(server_initial_metadata.find(meta3.first)->second));
@ -1235,10 +1068,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
response_writer.Finish(send_response, Status::OK, tag(5)); response_writer.Finish(send_response, Status::OK, tag(5));
response_reader->Finish(&recv_response, &recv_status, tag(6)); response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam().disable_blocking) Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
@ -1272,15 +1102,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel(); cli_ctx.TryCancel();
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get()); Verifier().Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled()); EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); Verifier().Expect(4, true).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code()); EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
} }
@ -1307,17 +1137,13 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(), service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2)); cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); Verifier().Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking) Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
.Expect(3, true)
.Expect(4, true)
.Expect(5, true)
.Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled()); EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_EQ(send_response.message(), recv_response.message());
@ -1344,7 +1170,7 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get())); stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4)); response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get()); Verifier().Expect(4, true).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code()); EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message()); EXPECT_EQ("", recv_status.error_message());
@ -1398,10 +1224,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
srv_ctx.AsyncNotifyWhenDone(tag(11)); srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
std::thread t1([this, &cli_cq] { std::thread t1(
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq); [this, &cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
}); Verifier().Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
t1.join(); t1.join();
bool expected_server_cq_result = true; bool expected_server_cq_result = true;
@ -1409,7 +1234,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel(); srv_ctx.TryCancel();
Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); Verifier().Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled()); EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know // Since cancellation is done before server reads any results, we know
@ -1430,13 +1255,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + grpc::to_string(tag_idx)); send_request.set_message("Ping " + grpc::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx)); cli_stream->Write(send_request, tag(tag_idx));
Verifier(GetParam().disable_blocking) Verifier()
.Expect(tag_idx, expected_client_cq_result) .Expect(tag_idx, expected_client_cq_result)
.Verify(&cli_cq, ignore_client_cq_result); .Verify(&cli_cq, ignore_client_cq_result);
} }
cli_stream->WritesDone(tag(6)); cli_stream->WritesDone(tag(6));
// Ignore ok on WritesDone since cancel can affect it // Ignore ok on WritesDone since cancel can affect it
Verifier(GetParam().disable_blocking) Verifier()
.Expect(6, expected_client_cq_result) .Expect(6, expected_client_cq_result)
.Verify(&cli_cq, ignore_client_cq_result); .Verify(&cli_cq, ignore_client_cq_result);
}); });
@ -1445,7 +1270,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
bool want_done_tag = false; bool want_done_tag = false;
std::thread* server_try_cancel_thd = nullptr; std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking); auto verif = Verifier();
if (server_try_cancel == CANCEL_DURING_PROCESSING) { if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = server_try_cancel_thd =
@ -1505,11 +1330,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server sends the final message and cancelled status (but the RPC is // Server sends the final message and cancelled status (but the RPC is
// already cancelled at this point. So we expect the operation to fail) // already cancelled at this point. So we expect the operation to fail)
srv_stream.Finish(send_response, Status::CANCELLED, tag(9)); srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); Verifier().Expect(9, false).Verify(cq_.get());
// Client will see the cancellation // Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq); Verifier().Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok()); EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
@ -1555,10 +1380,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2)); cq_.get(), cq_.get(), tag(2));
std::thread t1([this, &cli_cq] { std::thread t1(
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq); [this, &cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
}); Verifier().Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
t1.join(); t1.join();
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
@ -1572,7 +1396,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel(); srv_ctx.TryCancel();
Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); Verifier().Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled()); EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point // We know for sure that all cq results will be false from this point
@ -1587,7 +1411,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
EchoResponse recv_response; EchoResponse recv_response;
cli_stream->Read(&recv_response, tag(tag_idx)); cli_stream->Read(&recv_response, tag(tag_idx));
Verifier(GetParam().disable_blocking) Verifier()
.Expect(tag_idx, expected_client_cq_result) .Expect(tag_idx, expected_client_cq_result)
.Verify(&cli_cq, ignore_client_cq_result); .Verify(&cli_cq, ignore_client_cq_result);
} }
@ -1595,7 +1419,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = nullptr; std::thread* server_try_cancel_thd = nullptr;
auto verif = Verifier(GetParam().disable_blocking); auto verif = Verifier();
if (server_try_cancel == CANCEL_DURING_PROCESSING) { if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd = server_try_cancel_thd =
@ -1656,11 +1480,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server finishes the stream (but the RPC is already cancelled) // Server finishes the stream (but the RPC is already cancelled)
srv_stream.Finish(Status::CANCELLED, tag(9)); srv_stream.Finish(Status::CANCELLED, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); Verifier().Expect(9, false).Verify(cq_.get());
// Client will see the cancellation // Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq); Verifier().Expect(10, true).Verify(&cli_cq);
EXPECT_FALSE(recv_status.ok()); EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
@ -1707,12 +1531,9 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
srv_ctx.AsyncNotifyWhenDone(tag(11)); srv_ctx.AsyncNotifyWhenDone(tag(11));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2)); tag(2));
Verifier(GetParam().disable_blocking) Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
auto verif = Verifier(GetParam().disable_blocking); auto verif = Verifier();
// Client sends the first and the only message // Client sends the first and the only message
send_request.set_message("Ping"); send_request.set_message("Ping");
@ -1852,10 +1673,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// know that cq results are supposed to return false on server. // know that cq results are supposed to return false on server.
srv_stream.Finish(Status::CANCELLED, tag(9)); srv_stream.Finish(Status::CANCELLED, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get()); Verifier().Expect(9, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10)); cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); Verifier().Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok()); EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code()); EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
} }
@ -1897,8 +1718,7 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING); TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
} }
std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking, std::vector<TestScenario> CreateTestScenarios(bool test_secure,
bool test_secure,
int test_big_limit) { int test_big_limit) {
std::vector<TestScenario> scenarios; std::vector<TestScenario> scenarios;
std::vector<grpc::string> credentials_types; std::vector<grpc::string> credentials_types;
@ -1936,14 +1756,10 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
for (auto msg = messages.begin(); msg != messages.end(); msg++) { for (auto msg = messages.begin(); msg != messages.end(); msg++) {
for (auto cred = credentials_types.begin(); for (auto cred = credentials_types.begin();
cred != credentials_types.end(); ++cred) { cred != credentials_types.end(); ++cred) {
scenarios.emplace_back(false, false, *cred, health_check_service, *msg); scenarios.emplace_back(false, *cred, health_check_service, *msg);
if (test_disable_blocking) {
scenarios.emplace_back(true, false, *cred, health_check_service,
*msg);
}
} }
if (insec_ok()) { if (insec_ok()) {
scenarios.emplace_back(false, true, kInsecureCredentialsType, scenarios.emplace_back(true, kInsecureCredentialsType,
health_check_service, *msg); health_check_service, *msg);
} }
} }
@ -1952,12 +1768,10 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
} }
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest, INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
::testing::ValuesIn(CreateTestScenarios(true, true, ::testing::ValuesIn(CreateTestScenarios(true, 1024)));
1024)));
INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
AsyncEnd2endServerTryCancelTest, AsyncEnd2endServerTryCancelTest,
::testing::ValuesIn(CreateTestScenarios(false, false, ::testing::ValuesIn(CreateTestScenarios(false, 0)));
0)));
} // namespace } // namespace
} // namespace testing } // namespace testing

Loading…
Cancel
Save