Fix a race on vptr for UnimplementedAsyncRequest (#32547)

It is reported in https://github.com/grpc/grpc/issues/32356 that there
is a race on vptr for `UnimplementedAsyncRequest` which would cause
crashes for multi-threaded server if clients send unimplemented RPC
request to the server.

The cause is that the server requests a call for
`UnimplementedAsyncRequest` in its base class `GenericAsyncRequest` when
the `vptr` still points to the base class's `vtable`. If the call went
in and another server thread picks up the tag before the `vptr` points
back to the derived class's `vtable`, it would call the wrong virtual
function and also this is a data race. This fix makes the request of the
call inside the derived class's constructor.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32605/head
Yijie Ma 2 years ago committed by GitHub
parent 40ccf97217
commit ac7faf75ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      include/grpcpp/server_interface.h
  2. 27
      src/cpp/server/server_cc.cc
  3. 82
      test/cpp/server/server_request_call_test.cc

@ -301,10 +301,13 @@ class ServerInterface : public internal::CallHook {
internal::ServerAsyncStreamingInterface* stream,
grpc::CompletionQueue* call_cq,
grpc::ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool delete_on_finalize, bool issue_request = true);
bool FinalizeResult(void** tag, bool* status) override;
protected:
void IssueRequest();
private:
grpc_call_details call_details_;
};

@ -247,18 +247,16 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest(
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize,
bool issue_request)
: BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
delete_on_finalize) {
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
// The following call_start_batch is internally-generated so no need for an
// explanatory log on failure.
GPR_ASSERT(grpc_server_request_call(server->server(), &call_, &call_details_,
context->client_metadata_.arr(),
call_cq->cq(), notification_cq->cq(),
this) == GRPC_CALL_OK);
if (issue_request) {
IssueRequest();
}
}
bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
@ -286,6 +284,15 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
return BaseAsyncRequest::FinalizeResult(tag, status);
}
void ServerInterface::GenericAsyncRequest::IssueRequest() {
// The following call_start_batch is internally-generated so no need for an
// explanatory log on failure.
GPR_ASSERT(grpc_server_request_call(server_->server(), &call_, &call_details_,
context_->client_metadata_.arr(),
call_cq_->cq(), notification_cq_->cq(),
this) == GRPC_CALL_OK);
}
namespace {
class ShutdownCallback : public grpc_completion_queue_functor {
public:
@ -325,7 +332,11 @@ class Server::UnimplementedAsyncRequest final
UnimplementedAsyncRequest(ServerInterface* server,
grpc::ServerCompletionQueue* cq)
: GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
nullptr, false) {}
/*tag=*/nullptr, /*delete_on_finalize=*/false,
/*issue_request=*/false) {
// Issue request here instead of the base class to prevent race on vptr.
IssueRequest();
}
bool FinalizeResult(void** tag, bool* status) override;

@ -155,6 +155,88 @@ TEST(ServerRequestCallTest, ShortDeadlineDoesNotCauseOkayFalse) {
t.join();
}
void ServerFunction(ServerCompletionQueue* cq, std::atomic_bool* shutdown) {
for (;;) {
bool ok;
void* tag;
if (!cq->Next(&tag, &ok)) {
break;
}
if (shutdown->load()) {
break;
}
// For UnimplementedAsyncRequest, the server handles it internally and never
// returns from Next except when shutdown.
grpc_core::Crash("unreached");
}
}
void ClientFunction(testing::UnimplementedEchoService::Stub* stub) {
constexpr int kNumRpcPerThreads = 5000;
for (int i = 0; i < kNumRpcPerThreads; i++) {
testing::EchoRequest request;
request.set_message("foobar");
testing::EchoResponse response;
grpc::ClientContext ctx;
grpc::Status status = stub->Unimplemented(&ctx, request, &response);
EXPECT_EQ(StatusCode::UNIMPLEMENTED, status.error_code());
}
}
TEST(ServerRequestCallTest, MultithreadedUnimplementedService) {
std::atomic_bool shutdown(false);
// grpc server config.
std::ostringstream s;
int p = grpc_pick_unused_port_or_die();
s << "[::1]:" << p;
const string address = s.str();
testing::EchoTestService::AsyncService service;
ServerBuilder builder;
builder.AddListeningPort(address, InsecureServerCredentials());
auto cq = builder.AddCompletionQueue();
builder.RegisterService(&service);
auto server = builder.BuildAndStart();
ServerContext ctx;
testing::EchoRequest req;
ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
reinterpret_cast<void*>(1));
// server threads
constexpr int kNumServerThreads = 2;
std::vector<std::thread> server_threads;
server_threads.reserve(kNumServerThreads);
for (int i = 0; i < kNumServerThreads; i++) {
server_threads.emplace_back(ServerFunction, cq.get(), &shutdown);
}
auto stub = testing::UnimplementedEchoService::NewStub(
grpc::CreateChannel(address, InsecureChannelCredentials()));
// client threads
constexpr int kNumClientThreads = 2;
std::vector<std::thread> client_threads;
client_threads.reserve(kNumClientThreads);
for (int i = 0; i < kNumClientThreads; i++) {
client_threads.emplace_back(ClientFunction, stub.get());
}
for (auto& t : client_threads) {
t.join();
}
// Shut down everything properly.
gpr_log(GPR_INFO, "Shutting down.");
shutdown.store(true);
server->Shutdown();
cq->Shutdown();
server->Wait();
for (auto& t : server_threads) {
t.join();
}
}
} // namespace
} // namespace grpc

Loading…
Cancel
Save