|
|
|
@ -278,7 +278,7 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
|
|
|
|
|
void* t; |
|
|
|
|
bool ok; |
|
|
|
|
int need_tags; |
|
|
|
|
int expect_tags; |
|
|
|
|
|
|
|
|
|
// Send 'max_ping_pongs' number of ping pong messages
|
|
|
|
|
int ping_pong_cnt = 0; |
|
|
|
@ -289,7 +289,7 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
request_rw->Write(send_request, tag(2)); // Start client send
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
need_tags = (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5); |
|
|
|
|
int await_tags = (1 << 2); |
|
|
|
|
|
|
|
|
|
if (ping_pong_cnt == 0) { |
|
|
|
|
// wait for the server call structure (call_hook, etc.) to be
|
|
|
|
@ -301,8 +301,8 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
// In some cases tag:2 comes before tag:0 (write tag comes out
|
|
|
|
|
// first), this while loop is to make sure get tag:0.
|
|
|
|
|
int i = (int)(intptr_t)t; |
|
|
|
|
GPR_ASSERT(need_tags & (1 << i)); |
|
|
|
|
need_tags &= ~(1 << i); |
|
|
|
|
GPR_ASSERT(await_tags & (1 << i)); |
|
|
|
|
await_tags &= ~(1 << i); |
|
|
|
|
GPR_ASSERT(fixture->cq()->Next(&t, &ok)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -310,7 +310,11 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
response_rw.Read(&recv_request, tag(3)); // Start server recv
|
|
|
|
|
request_rw->Read(&recv_response, tag(4)); // Start client recv
|
|
|
|
|
|
|
|
|
|
while (need_tags) { |
|
|
|
|
await_tags |= (1 << 3) | (1 << 4); |
|
|
|
|
expect_tags = await_tags; |
|
|
|
|
await_tags |= (1 << 5); |
|
|
|
|
|
|
|
|
|
while (await_tags != 0) { |
|
|
|
|
GPR_ASSERT(fixture->cq()->Next(&t, &ok)); |
|
|
|
|
GPR_ASSERT(ok); |
|
|
|
|
int i = (int)(intptr_t)t; |
|
|
|
@ -321,34 +325,39 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
if (write_and_finish == 1) { |
|
|
|
|
response_rw.WriteAndFinish(send_response, WriteOptions(), |
|
|
|
|
Status::OK, tag(5)); |
|
|
|
|
expect_tags |= (1 << 5); |
|
|
|
|
} else { |
|
|
|
|
response_rw.WriteLast(send_response, WriteOptions(), tag(5)); |
|
|
|
|
// WriteLast buffers the write, so neither server write op nor
|
|
|
|
|
// client read op will finish inside the while loop.
|
|
|
|
|
need_tags &= ~(1 << 4); |
|
|
|
|
need_tags &= ~(1 << 5); |
|
|
|
|
// WriteLast buffers the write, so it's possible neither server
|
|
|
|
|
// write op nor client read op will finish inside the while
|
|
|
|
|
// loop.
|
|
|
|
|
await_tags &= ~(1 << 4); |
|
|
|
|
await_tags &= ~(1 << 5); |
|
|
|
|
expect_tags |= (1 << 5); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
response_rw.Write(send_response, tag(5)); |
|
|
|
|
expect_tags |= (1 << 5); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(need_tags & (1 << i)); |
|
|
|
|
need_tags &= ~(1 << i); |
|
|
|
|
GPR_ASSERT(expect_tags & (1 << i)); |
|
|
|
|
expect_tags &= ~(1 << i); |
|
|
|
|
await_tags &= ~(1 << i); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ping_pong_cnt++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (max_ping_pongs == 0) { |
|
|
|
|
need_tags = (1 << 6) | (1 << 7) | (1 << 8); |
|
|
|
|
expect_tags |= (1 << 6) | (1 << 7) | (1 << 8); |
|
|
|
|
} else { |
|
|
|
|
if (write_and_finish == 1) { |
|
|
|
|
need_tags = (1 << 8); |
|
|
|
|
expect_tags |= (1 << 8); |
|
|
|
|
} else { |
|
|
|
|
// server's buffered write and the client's read of the buffered write
|
|
|
|
|
// tags should come up.
|
|
|
|
|
need_tags = (1 << 4) | (1 << 5) | (1 << 7) | (1 << 8); |
|
|
|
|
expect_tags |= (1 << 7) | (1 << 8); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -360,8 +369,8 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
GPR_ASSERT(fixture->cq()->Next(&t, &ok)); |
|
|
|
|
while ((int)(intptr_t)t != 0) { |
|
|
|
|
int i = (int)(intptr_t)t; |
|
|
|
|
GPR_ASSERT(need_tags & (1 << i)); |
|
|
|
|
need_tags &= ~(1 << i); |
|
|
|
|
GPR_ASSERT(expect_tags & (1 << i)); |
|
|
|
|
expect_tags &= ~(1 << i); |
|
|
|
|
GPR_ASSERT(fixture->cq()->Next(&t, &ok)); |
|
|
|
|
} |
|
|
|
|
response_rw.Finish(Status::OK, tag(7)); |
|
|
|
@ -374,11 +383,11 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) { |
|
|
|
|
Status recv_status; |
|
|
|
|
request_rw->Finish(&recv_status, tag(8)); |
|
|
|
|
|
|
|
|
|
while (need_tags) { |
|
|
|
|
while (expect_tags) { |
|
|
|
|
GPR_ASSERT(fixture->cq()->Next(&t, &ok)); |
|
|
|
|
int i = (int)(intptr_t)t; |
|
|
|
|
GPR_ASSERT(need_tags & (1 << i)); |
|
|
|
|
need_tags &= ~(1 << i); |
|
|
|
|
GPR_ASSERT(expect_tags & (1 << i)); |
|
|
|
|
expect_tags &= ~(1 << i); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(recv_status.ok()); |
|
|
|
|