Fix async_end2end_test flow control

Completion queues + flow control + single threading is hard.

We need a read outstanding on a call to grant flow control tokens to the
remote end.

To do that we need to request a read *before* we wait for the write to
be finished, otherwise, in the case of a large write we'll block waiting
for flow control tokens.

Built on #6402
pull/6412/head
Craig Tiller 9 years ago
parent 5f51757f39
commit e4d2748f2f
  1. 131
      test/cpp/end2end/async_end2end_test.cc

@ -281,10 +281,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -345,12 +346,9 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@ -384,31 +382,35 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(6));
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(9, true)
.Expect(10, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -442,24 +444,27 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(8));
Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
@ -493,31 +498,35 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
Verifier(GetParam().disable_blocking).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam().disable_blocking).Expect(8, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(9, true)
.Expect(10, true)
.Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@ -562,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -612,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -652,11 +662,13 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(4, true)
.Expect(5, true)
.Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -730,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -807,12 +821,13 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Expect(5, true)
.Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());

Loading…
Cancel
Save