|
|
|
@ -423,51 +423,61 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) { |
|
|
|
|
TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) { |
|
|
|
|
MAYBE_SKIP_TEST; |
|
|
|
|
ResetStub(); |
|
|
|
|
std::mutex mu1, mu2, mu3; |
|
|
|
|
std::condition_variable cv; |
|
|
|
|
bool done = false; |
|
|
|
|
EchoRequest request1, request2, request3; |
|
|
|
|
request1.set_message("Hello locked world1."); |
|
|
|
|
request2.set_message("Hello locked world2."); |
|
|
|
|
request3.set_message("Hello locked world3."); |
|
|
|
|
EchoResponse response1, response2, response3; |
|
|
|
|
ClientContext cli_ctx1, cli_ctx2, cli_ctx3; |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::mutex> l(mu1); |
|
|
|
|
|
|
|
|
|
// The request/response state associated with an RPC and the synchronization
|
|
|
|
|
// variables needed to notify its completion.
|
|
|
|
|
struct RpcState { |
|
|
|
|
std::mutex mu; |
|
|
|
|
std::condition_variable cv; |
|
|
|
|
bool done = false; |
|
|
|
|
EchoRequest request; |
|
|
|
|
EchoResponse response; |
|
|
|
|
ClientContext cli_ctx; |
|
|
|
|
|
|
|
|
|
RpcState() = default; |
|
|
|
|
~RpcState() { |
|
|
|
|
// Grab the lock to prevent destruction while another is still holding
|
|
|
|
|
// lock
|
|
|
|
|
std::lock_guard<std::mutex> lock(mu); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
std::vector<RpcState> rpc_state(3); |
|
|
|
|
for (size_t i = 0; i < rpc_state.size(); i++) { |
|
|
|
|
std::string message = "Hello locked world"; |
|
|
|
|
message += std::to_string(i); |
|
|
|
|
rpc_state[i].request.set_message(message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Grab a lock and then start an RPC whose callback grabs the same lock and
|
|
|
|
|
// then calls this function to start the next RPC under lock (up to a limit of
|
|
|
|
|
// the size of the rpc_state vector).
|
|
|
|
|
std::function<void(int)> nested_call = [this, &nested_call, |
|
|
|
|
&rpc_state](int index) { |
|
|
|
|
std::lock_guard<std::mutex> l(rpc_state[index].mu); |
|
|
|
|
stub_->experimental_async()->Echo( |
|
|
|
|
&cli_ctx1, &request1, &response1, |
|
|
|
|
[this, &mu1, &mu2, &mu3, &cv, &done, &request1, &request2, &request3, |
|
|
|
|
&response1, &response2, &response3, &cli_ctx2, &cli_ctx3](Status s1) { |
|
|
|
|
std::lock_guard<std::mutex> l1(mu1); |
|
|
|
|
EXPECT_TRUE(s1.ok()); |
|
|
|
|
EXPECT_EQ(request1.message(), response1.message()); |
|
|
|
|
// start the second level of nesting
|
|
|
|
|
std::unique_lock<std::mutex> l2(mu2); |
|
|
|
|
this->stub_->experimental_async()->Echo( |
|
|
|
|
&cli_ctx2, &request2, &response2, |
|
|
|
|
[this, &mu2, &mu3, &cv, &done, &request2, &request3, &response2, |
|
|
|
|
&response3, &cli_ctx3](Status s2) { |
|
|
|
|
std::lock_guard<std::mutex> l2(mu2); |
|
|
|
|
EXPECT_TRUE(s2.ok()); |
|
|
|
|
EXPECT_EQ(request2.message(), response2.message()); |
|
|
|
|
// start the third level of nesting
|
|
|
|
|
std::lock_guard<std::mutex> l3(mu3); |
|
|
|
|
stub_->experimental_async()->Echo( |
|
|
|
|
&cli_ctx3, &request3, &response3, |
|
|
|
|
[&mu3, &cv, &done, &request3, &response3](Status s3) { |
|
|
|
|
std::lock_guard<std::mutex> l(mu3); |
|
|
|
|
EXPECT_TRUE(s3.ok()); |
|
|
|
|
EXPECT_EQ(request3.message(), response3.message()); |
|
|
|
|
done = true; |
|
|
|
|
cv.notify_all(); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
&rpc_state[index].cli_ctx, &rpc_state[index].request, |
|
|
|
|
&rpc_state[index].response, |
|
|
|
|
[index, &nested_call, &rpc_state](Status s) { |
|
|
|
|
std::lock_guard<std::mutex> l1(rpc_state[index].mu); |
|
|
|
|
EXPECT_TRUE(s.ok()); |
|
|
|
|
rpc_state[index].done = true; |
|
|
|
|
rpc_state[index].cv.notify_all(); |
|
|
|
|
// Call the next level of nesting if possible
|
|
|
|
|
if (index + 1 < rpc_state.size()) { |
|
|
|
|
nested_call(index + 1); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
std::unique_lock<std::mutex> l(mu3); |
|
|
|
|
while (!done) { |
|
|
|
|
cv.wait(l); |
|
|
|
|
nested_call(0); |
|
|
|
|
|
|
|
|
|
// Wait for completion notifications from all RPCs. Order doesn't matter.
|
|
|
|
|
for (RpcState& state : rpc_state) { |
|
|
|
|
std::unique_lock<std::mutex> l(state.mu); |
|
|
|
|
while (!state.done) { |
|
|
|
|
state.cv.wait(l); |
|
|
|
|
} |
|
|
|
|
EXPECT_EQ(state.request.message(), state.response.message()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|