Merge pull request #18381 from rmstar/concurrentrequests

Fix bug in CFStream endpoint
pull/18552/head
rmstar 6 years ago committed by GitHub
commit 2e2cecebbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      src/core/lib/iomgr/cfstream_handle.cc
  2. 150
      test/cpp/end2end/cfstream_test.cc
  3. 1
      test/cpp/end2end/test_service_impl.cc

@ -29,6 +29,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error_cfstream.h"
#include "src/core/lib/iomgr/exec_ctx.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
@ -54,6 +55,8 @@ void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
void* client_callback_info) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_error* error;
CFErrorRef stream_error;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
@ -68,8 +71,15 @@ void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
handle->read_event_.SetReady();
break;
case kCFStreamEventErrorOccurred:
handle->open_event_.SetReady();
handle->read_event_.SetReady();
stream_error = CFReadStreamCopyError(stream);
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
CFRelease(stream_error);
handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
break;
default:
GPR_UNREACHABLE_CODE(return );
@ -80,6 +90,8 @@ void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
void* clientCallBackInfo) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_error* error;
CFErrorRef stream_error;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
@ -94,8 +106,15 @@ void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
handle->write_event_.SetReady();
break;
case kCFStreamEventErrorOccurred:
handle->open_event_.SetReady();
handle->write_event_.SetReady();
stream_error = CFWriteStreamCopyError(stream);
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
CFRelease(stream_error);
handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
break;
default:
GPR_UNREACHABLE_CODE(return );

@ -47,8 +47,10 @@
#include "test/cpp/end2end/test_service_impl.h"
#ifdef GRPC_CFSTREAM
using grpc::ClientAsyncResponseReader;
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using grpc::testing::RequestParams;
using std::chrono::system_clock;
namespace grpc {
@ -60,8 +62,7 @@ class CFStreamTest : public ::testing::Test {
CFStreamTest()
: server_host_("grpctest"),
interface_("lo0"),
ipv4_address_("10.0.0.1"),
netmask_("/32"),
ipv4_address_("127.0.0.2"),
kRequestMessage_("🖖") {}
void DNSUp() {
@ -92,11 +93,13 @@ class CFStreamTest : public ::testing::Test {
}
void NetworkUp() {
gpr_log(GPR_DEBUG, "Bringing network up");
InterfaceUp();
DNSUp();
}
void NetworkDown() {
gpr_log(GPR_DEBUG, "Bringing network down");
InterfaceDown();
DNSDown();
}
@ -149,6 +152,27 @@ class CFStreamTest : public ::testing::Test {
EXPECT_TRUE(status.ok());
}
}
void SendAsyncRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
RequestParams param = RequestParams()) {
EchoRequest request;
auto msg = std::to_string(ctr.load());
request.set_message(msg);
ctr++;
*request.mutable_param() = std::move(param);
AsyncClientCall* call = new AsyncClientCall;
call->response_reader =
stub->PrepareAsyncEcho(&call->context, request, &cq_);
call->response_reader->StartCall();
gpr_log(GPR_DEBUG, "Sending request: %s", msg.c_str());
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
void ShutdownCQ() { cq_.Shutdown(); }
bool CQNext(void** tag, bool* ok) { return cq_.Next(tag, ok); }
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline =
@ -172,6 +196,13 @@ class CFStreamTest : public ::testing::Test {
return true;
}
struct AsyncClientCall {
EchoResponse reply;
ClientContext context;
Status status;
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
};
private:
struct ServerData {
int port_;
@ -214,14 +245,14 @@ class CFStreamTest : public ::testing::Test {
}
};
CompletionQueue cq_;
const grpc::string server_host_;
const grpc::string interface_;
const grpc::string ipv4_address_;
const grpc::string netmask_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerData> server_;
int port_;
const grpc::string kRequestMessage_;
std::atomic_int ctr{0};
};
// gRPC should automatically detech network flaps (without enabling keepalives)
@ -261,6 +292,117 @@ TEST_F(CFStreamTest, NetworkTransition) {
sender.join();
}
// Network flaps while RPCs are in flight
TEST_F(CFStreamTest, NetworkFlapRpcsInFlight) {
auto channel = BuildChannel();
auto stub = BuildStub(channel);
std::atomic_int rpcs_sent{0};
// Channel should be in READY state after we send some RPCs
for (int i = 0; i < 10; ++i) {
SendAsyncRpc(stub);
++rpcs_sent;
}
EXPECT_TRUE(WaitForChannelReady(channel.get()));
// Bring down the network
NetworkDown();
std::thread thd = std::thread([this, &rpcs_sent]() {
void* got_tag;
bool ok = false;
bool network_down = true;
int total_completions = 0;
while (CQNext(&got_tag, &ok)) {
++total_completions;
GPR_ASSERT(ok);
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
if (call->status.ok()) {
gpr_log(GPR_DEBUG, "RPC response: %s", call->reply.message().c_str());
} else {
gpr_log(GPR_DEBUG, "RPC failed with error: %s",
call->status.error_message().c_str());
// Bring network up when RPCs start failing
if (network_down) {
NetworkUp();
network_down = false;
}
}
delete call;
}
EXPECT_EQ(total_completions, rpcs_sent);
});
for (int i = 0; i < 100; ++i) {
SendAsyncRpc(stub);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
++rpcs_sent;
}
ShutdownCQ();
thd.join();
}
// Send a bunch of RPCs, some of which are expected to fail.
// We should get back a response for all RPCs
TEST_F(CFStreamTest, ConcurrentRpc) {
auto channel = BuildChannel();
auto stub = BuildStub(channel);
std::atomic_int rpcs_sent{0};
std::thread thd = std::thread([this, &rpcs_sent]() {
void* got_tag;
bool ok = false;
bool network_down = true;
int total_completions = 0;
while (CQNext(&got_tag, &ok)) {
++total_completions;
GPR_ASSERT(ok);
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
if (call->status.ok()) {
gpr_log(GPR_DEBUG, "RPC response: %s", call->reply.message().c_str());
} else {
gpr_log(GPR_DEBUG, "RPC failed: %s",
call->status.error_message().c_str());
// Bring network up when RPCs start failing
if (network_down) {
NetworkUp();
network_down = false;
}
}
delete call;
}
EXPECT_EQ(total_completions, rpcs_sent);
});
for (int i = 0; i < 10; ++i) {
if (i % 3 == 0) {
RequestParams param;
ErrorStatus* error = param.mutable_expected_error();
error->set_code(StatusCode::INTERNAL);
error->set_error_message("internal error");
SendAsyncRpc(stub, param);
} else if (i % 5 == 0) {
RequestParams param;
param.set_echo_metadata(true);
DebugInfo* info = param.mutable_debug_info();
info->add_stack_entries("stack_entry1");
info->add_stack_entries("stack_entry2");
info->set_detail("detailed debug info");
SendAsyncRpc(stub, param);
} else {
SendAsyncRpc(stub);
}
++rpcs_sent;
}
ShutdownCQ();
thd.join();
}
} // namespace
} // namespace testing
} // namespace grpc

@ -143,6 +143,7 @@ void LoopUntilCancelled(Alarm* alarm, ServerContext* context,
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str());
// A bit of sleep to make sure that short deadline tests fail
if (request->has_param() && request->param().server_sleep_us() > 0) {
gpr_sleep_until(

Loading…
Cancel
Save