Binder transport: Fix server issue when handling many parallel RPC calls (#29483)

When single closure is scheduled multiple times before it is run, it
runs less times than the number of times it is scheduled.

As a result, when server receives multiple RPC calls in a very short
time frame, `accept_stream_locked` is not correctly called for every
RPC call received.

We are working on internal server side stress test to make sure this
kind of error won't happen again.

This commit also uses atomic int for issuing new stream id. Without this
there is 10% probability of stream id collision when 128 parallel RPC
calls are initiated at the same time.
pull/29514/head^2
Ming-Chuan 3 years ago committed by GitHub
parent d4680eb8eb
commit cea03edc0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      src/core/ext/transport/binder/transport/binder_transport.cc
  2. 5
      src/core/ext/transport/binder/transport/binder_transport.h
  3. 40
      test/core/transport/binder/end2end/binder_server_test.cc

@ -107,9 +107,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
GPR_TIMER_SCOPE("init_stream", 0);
gpr_log(GPR_INFO, "%s = %p %p %p %p %p", __func__, gt, gs, refcount,
server_data, arena);
// Note that this function is not locked and may be invoked concurrently
grpc_binder_transport* t = reinterpret_cast<grpc_binder_transport*>(gt);
// TODO(mingcl): Figure out if we need to worry about concurrent invocation
// here
new (gs) grpc_binder_stream(t, refcount, server_data, arena,
t->NewStreamTxCode(), t->is_client);
@ -736,13 +735,13 @@ grpc_binder_transport::grpc_binder_transport(
refs(1, nullptr) {
gpr_log(GPR_INFO, __func__);
base.vtable = get_vtable();
GRPC_CLOSURE_INIT(&accept_stream_closure, accept_stream_locked, this,
nullptr);
transport_stream_receiver =
std::make_shared<grpc_binder::TransportStreamReceiverImpl>(
is_client, /*accept_stream_callback=*/[this] {
grpc_core::ExecCtx exec_ctx;
combiner->Run(&accept_stream_closure, GRPC_ERROR_NONE);
combiner->Run(
GRPC_CLOSURE_CREATE(accept_stream_locked, this, nullptr),
GRPC_ERROR_NONE);
});
// WireReader holds a ref to grpc_binder_transport.
GRPC_BINDER_REF_TRANSPORT(this, "wire reader");

@ -17,6 +17,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <memory>
#include <string>
#include <utility>
@ -68,8 +69,6 @@ struct grpc_binder_transport {
absl::flat_hash_map<int, grpc_binder_stream*> registered_stream;
grpc_core::Combiner* combiner;
grpc_closure accept_stream_closure;
// The callback and the data for the callback when the stream is connected
// between client and server.
void (*accept_stream_fn)(void* user_data, grpc_transport* transport,
@ -80,7 +79,7 @@ struct grpc_binder_transport {
grpc_core::RefCount refs;
private:
int next_free_tx_code = grpc_binder::kFirstCallId;
std::atomic<int> next_free_tx_code{grpc_binder::kFirstCallId};
};
grpc_transport* grpc_create_binder_transport_client(

@ -207,6 +207,46 @@ TEST_F(BinderServerTest, CreateChannelWithEndpointBinderMultipleConnections) {
server->Shutdown();
}
TEST_F(BinderServerTest, CreateChannelWithEndpointBinderParallelRequests) {
grpc::ServerBuilder server_builder;
grpc::testing::TestServiceImpl service;
server_builder.RegisterService(&service);
server_builder.AddListeningPort("binder:example.service",
grpc::testing::BinderServerCredentials());
std::unique_ptr<grpc::Server> server = server_builder.BuildAndStart();
void* raw_endpoint_binder =
grpc::experimental::binder::GetEndpointBinder("example.service");
std::unique_ptr<grpc_binder::Binder> endpoint_binder =
absl::make_unique<grpc_binder::end2end_testing::FakeBinder>(
static_cast<grpc_binder::end2end_testing::FakeEndpoint*>(
raw_endpoint_binder));
std::shared_ptr<grpc::Channel> channel =
grpc::testing::CreateBinderChannel(std::move(endpoint_binder));
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub =
grpc::testing::EchoTestService::NewStub(channel);
constexpr size_t kNumRequests = 128;
auto thread_fn = [&](size_t id) {
grpc::testing::EchoRequest request;
std::string msg = absl::StrFormat("BinderServerBuilder-%d", id);
request.set_message(msg);
grpc::testing::EchoResponse response;
grpc::ClientContext context;
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.message(), msg);
};
std::vector<std::thread> threads(kNumRequests);
for (size_t i = 0; i < kNumRequests; ++i) {
threads[i] = std::thread(thread_fn, i);
}
for (auto& thr : threads) {
thr.join();
}
server->Shutdown();
}
} // namespace
int main(int argc, char** argv) {

Loading…
Cancel
Save