Merge pull request #1494 from ctiller/the-churnening

Lose redundant tag on unary calls
pull/1490/merge
Yang Gao 10 years ago
commit 903810191e
  1. 2
      Makefile
  2. 2
      build.json
  3. 5
      include/grpc++/async_unary_call.h
  4. 8
      include/grpc++/impl/call.h
  5. 17
      src/compiler/cpp_generator.cc
  6. 18
      test/cpp/end2end/async_end2end_test.cc
  7. 8
      test/cpp/end2end/mock_test.cc
  8. 23
      test/cpp/qps/client_async.cc

@ -308,7 +308,7 @@ E = @echo
Q = @ Q = @
endif endif
VERSION = 0.7.0.0 VERSION = 0.8.0.0
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES)) CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS) CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)

@ -6,7 +6,7 @@
"#": "The public version number of the library.", "#": "The public version number of the library.",
"version": { "version": {
"major": 0, "major": 0,
"minor": 7, "minor": 8,
"micro": 0, "micro": 0,
"build": 0 "build": 0
} }

@ -60,9 +60,8 @@ class ClientAsyncResponseReader GRPC_FINAL
public: public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request, void* tag) const grpc::protobuf::Message& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) { : context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request); init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose(); init_buf_.AddClientSendClose();
@ -90,7 +89,7 @@ class ClientAsyncResponseReader GRPC_FINAL
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
CallOpBuffer init_buf_; SneakyCallOpBuffer init_buf_;
CallOpBuffer meta_buf_; CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_; CallOpBuffer finish_buf_;
}; };

@ -123,6 +123,14 @@ class CallOpBuffer : public CompletionQueueTag {
bool* recv_closed_; bool* recv_closed_;
}; };
// SneakyCallOpBuffer does not post completions to the completion queue
class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
return CallOpBuffer::FinalizeResult(tag, status) && false;
}
};
// Channel and Server implement this to allow them to hook performing ops // Channel and Server implement this to allow them to hook performing ops
class CallHook { class CallHook {
public: public:

@ -160,13 +160,13 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReaderInterface< $Response$>> " "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n"); "Async$Method$Raw(context, request, cq));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
@ -257,7 +257,7 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n"); "::grpc::CompletionQueue* cq) = 0;\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -305,7 +305,6 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
grpc_cpp_generator::ClassName(method->input_type(), true); grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] = (*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true); grpc_cpp_generator::ClassName(method->output_type(), true);
if (is_public) { if (is_public) {
if (NoStreaming(method)) { if (NoStreaming(method)) {
printer->Print( printer->Print(
@ -317,13 +316,13 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncResponseReader< $Response$>>(" "::grpc::ClientAsyncResponseReader< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n"); "Async$Method$Raw(context, request, cq));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
@ -412,7 +411,7 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>* " "::grpc::ClientAsyncResponseReader< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n"); "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -739,13 +738,13 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>* " "::grpc::ClientAsyncResponseReader< $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars, printer->Print(*vars,
" return new " " return new "
"::grpc::ClientAsyncResponseReader< $Response$>(" "::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, " "channel(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request, tag);\n" "context, request);\n"
"}\n\n"); "}\n\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,

@ -140,14 +140,13 @@ class AsyncEnd2endTest : public ::testing::Test {
send_request.set_message("Hello"); send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2)); tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
client_ok(1);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -195,7 +194,7 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
send_request.set_message("Hello"); send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
std::chrono::system_clock::time_point time_now( std::chrono::system_clock::time_point time_now(
std::chrono::system_clock::now()); std::chrono::system_clock::now());
@ -209,7 +208,6 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
verify_timed_ok(&srv_cq_, 2, true, time_limit); verify_timed_ok(&srv_cq_, 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
verify_timed_ok(&cli_cq_, 1, true, time_limit);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -399,7 +397,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
cli_ctx.AddMetadata(meta2.first, meta2.second); cli_ctx.AddMetadata(meta2.first, meta2.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2)); tag(2));
@ -409,7 +407,6 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size()); EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size());
client_ok(1);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -441,7 +438,7 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2)); tag(2));
@ -449,7 +446,6 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
client_ok(1);
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
server_ok(3); server_ok(3);
@ -489,7 +485,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2)); tag(2));
@ -497,7 +493,6 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
server_ok(3); server_ok(3);
client_ok(1);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
@ -550,7 +545,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
cli_ctx.AddMetadata(meta2.first, meta2.second); cli_ctx.AddMetadata(meta2.first, meta2.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2)); tag(2));
@ -560,7 +555,6 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size()); EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size());
client_ok(1);
srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second);

@ -119,8 +119,8 @@ class MockStub : public TestService::StubInterface {
private: private:
ClientAsyncResponseReaderInterface<EchoResponse>* AsyncEchoRaw( ClientAsyncResponseReaderInterface<EchoResponse>* AsyncEchoRaw(
ClientContext* context, const EchoRequest& request, CompletionQueue* cq, ClientContext* context, const EchoRequest& request,
void* tag) GRPC_OVERRIDE { CompletionQueue* cq) GRPC_OVERRIDE {
return nullptr; return nullptr;
} }
ClientWriterInterface<EchoRequest>* RequestStreamRaw( ClientWriterInterface<EchoRequest>* RequestStreamRaw(
@ -151,8 +151,8 @@ class MockStub : public TestService::StubInterface {
return nullptr; return nullptr;
} }
ClientAsyncResponseReaderInterface<EchoResponse>* AsyncUnimplementedRaw( ClientAsyncResponseReaderInterface<EchoResponse>* AsyncUnimplementedRaw(
ClientContext* context, const EchoRequest& request, CompletionQueue* cq, ClientContext* context, const EchoRequest& request,
void* tag) GRPC_OVERRIDE { CompletionQueue* cq) GRPC_OVERRIDE {
return nullptr; return nullptr;
} }
}; };

@ -75,19 +75,20 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
TestService::Stub* stub, const RequestType& req, TestService::Stub* stub, const RequestType& req,
std::function< std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&, TestService::Stub*, grpc::ClientContext*, const RequestType&)>
void*)> start_req, start_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent), next_state_(&ClientRpcContextUnaryImpl::RespDone),
callback_(on_done), callback_(on_done),
start_req_(start_req), start_req_(start_req),
start_(Timer::Now()), start_(Timer::Now()),
response_reader_( response_reader_(start_req(stub_, &context_, req_)) {
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool ret = (this->*next_state_)(ok); bool ret = (this->*next_state_)(ok);
@ -102,11 +103,6 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
} }
private: private:
bool ReqSent(bool) {
next_state_ = &ClientRpcContextUnaryImpl::RespDone;
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
return true;
}
bool RespDone(bool) { bool RespDone(bool) {
next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
return false; return false;
@ -122,8 +118,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
bool (ClientRpcContextUnaryImpl::*next_state_)(bool); bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&, void*)> TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_;
start_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
@ -198,8 +193,8 @@ private:
const SimpleRequest& req) { const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, void* tag) { const SimpleRequest& request) {
return stub->AsyncUnaryCall(ctx, request, cq, tag); return stub->AsyncUnaryCall(ctx, request, cq);
}; };
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, req, start_req, check_done); stub, req, start_req, check_done);

Loading…
Cancel
Save