Async client api change. Add a ClientAsyncResponseReader. Make the api similar to streaming and symmetric to server side.

pull/588/head
Yang Gao 10 years ago
parent d0d007e6d1
commit 3a5e5495e5
  1. 1
      Makefile
  2. 1
      build.json
  3. 144
      include/grpc++/async_unary_call.h
  4. 4
      include/grpc++/client_context.h
  5. 7
      include/grpc++/impl/client_unary_call.h
  6. 54
      include/grpc++/stream.h
  7. 22
      src/compiler/cpp_generator.cc
  8. 26
      src/cpp/client/client_unary_call.cc
  9. 82
      test/cpp/end2end/async_end2end_test.cc

@ -2978,6 +2978,7 @@ LIBGRPC++_SRC = \
src/cpp/util/time.cc \
PUBLIC_HEADERS_CXX += \
include/grpc++/async_unary_call.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
include/grpc++/client_context.h \

@ -398,6 +398,7 @@
"build": "all",
"language": "c++",
"public_headers": [
"include/grpc++/async_unary_call.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/client_context.h",

@ -0,0 +1,144 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_ASYNC_UNARY_CALL_H__
#define __GRPCPP_ASYNC_UNARY_CALL_H__
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
namespace grpc {
template <class R>
class ClientAsyncResponseReader final {
public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const google::protobuf::Message& request, void* tag)
: context_(context),
call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(msg);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
private:
ClientContext* context_ = nullptr;
Call call_;
CallOpBuffer init_buf_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};
template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
void BindCall(Call* call) override { call_ = *call; }
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};
} // namespace grpc
#endif // __GRPCPP_ASYNC_UNARY_CALL_H__

@ -72,6 +72,8 @@ template <class W>
class ClientAsyncWriter;
template <class R, class W>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
class ClientContext {
public:
@ -119,6 +121,8 @@ class ClientContext {
friend class ::grpc::ClientAsyncWriter;
template <class R, class W>
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
grpc_call *call() { return call_; }
void set_call(grpc_call *call) {

@ -48,13 +48,6 @@ class CompletionQueue;
class RpcMethod;
class Status;
// Wrapper that begins an asynchronous unary call
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag);
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,

@ -550,60 +550,6 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
CallOpBuffer finish_buf_;
};
// TODO(yangg) Move out of stream.h
template <class W>
class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
meta_buf_.Reset(tag);
meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_.PerformOps(&meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.IsOk()) {
finish_buf_.AddSendMessage(msg);
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.IsOk());
finish_buf_.Reset(tag);
if (!ctx_->sent_initial_metadata_) {
finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
bool cancelled = false;
finish_buf_.AddServerRecvClose(&cancelled);
finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
void BindCall(Call* call) override { call_ = *call; }
Call call_;
ServerContext* ctx_;
CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_;
};
template <class W, class R>
class ServerAsyncReader : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {

@ -126,6 +126,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
"class RpcService;\n"
"class ServerContext;\n";
if (HasUnaryCalls(file)) {
temp.append(
"template <class OutMessage> class ClientAsyncResponseReader;\n");
temp.append(
"template <class OutMessage> class ServerAsyncResponseWriter;\n");
}
@ -160,7 +162,8 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
}
std::string GetSourceIncludes() {
return "#include <grpc++/channel_interface.h>\n"
return "#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/channel_interface.h>\n"
"#include <grpc++/impl/client_unary_call.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/rpc_service_method.h>\n"
@ -181,9 +184,9 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response);\n");
printer->Print(*vars,
"void $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, "
"::grpc::Status* status, "
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
@ -378,14 +381,15 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"context, request, response);\n"
"}\n\n");
printer->Print(*vars,
"void $Service$::Stub::$Method$("
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response, ::grpc::Status* status, "
"::grpc::ClientAsyncResponseReader< $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" ::grpc::AsyncUnaryCall(channel(),"
" return new ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
"::grpc::RpcMethod($Service$_method_names[$Idx$]), "
"context, request, response, status, cq, tag);\n"
"context, request, tag);\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(

@ -60,30 +60,4 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
return status;
}
class ClientAsyncRequest final : public CallOpBuffer {
public:
void FinalizeResult(void **tag, bool *status) override {
CallOpBuffer::FinalizeResult(tag, status);
delete this;
}
};
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag) {
ClientAsyncRequest *buf = new ClientAsyncRequest;
buf->Reset(tag);
Call call(channel->CreateCall(method, context, cq));
buf->AddSendInitialMetadata(context);
buf->AddSendMessage(request);
buf->AddRecvInitialMetadata(context);
buf->AddRecvMessage(result);
buf->AddClientSendClose();
buf->AddClientRecvStatus(context, status);
call.PerformOps(buf);
}
} // namespace grpc

@ -38,6 +38,7 @@
#include "test/cpp/util/echo_duplicate.pb.h"
#include "test/cpp/util/echo.pb.h"
#include "src/cpp/util/time.h"
#include <grpc++/async_unary_call.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
@ -124,21 +125,23 @@ class AsyncEnd2endTest : public ::testing::Test {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
response_reader(stub_->Echo(
&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
client_ok(1);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
server_ok(3);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(4));
client_ok(4);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
@ -341,8 +344,8 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -352,13 +355,15 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, client_initial_metadata.size());
client_ok(1);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
server_ok(3);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(4));
client_ok(4);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
@ -381,8 +386,8 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -390,22 +395,26 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
client_ok(1);
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(4));
response_reader->ReadInitialMetadata(tag(4));
client_ok(4);
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
server_ok(4);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
server_ok(5);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(6));
client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
}
TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
@ -425,8 +434,8 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -434,6 +443,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
client_ok(1);
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
@ -442,8 +452,9 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
server_ok(4);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(5));
client_ok(5);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -467,17 +478,20 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
send_request.set_message("Hello");
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
std::pair<grpc::string, grpc::string> meta2(
"key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
std::pair<grpc::string, grpc::string> meta3("key3", "val3");
std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
std::pair<grpc::string, grpc::string> meta6("key4-bin",
{"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
std::pair<grpc::string, grpc::string> meta5("key5", "val5");
std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
std::pair<grpc::string, grpc::string> meta4("key6-bin",
{"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->Echo(&cli_ctx, send_request, &cli_cq_, tag(1)));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
@ -487,27 +501,31 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, client_initial_metadata.size());
client_ok(1);
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
response_reader->ReadInitialMetadata(tag(4));
client_ok(4);
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(4));
response_writer.Finish(send_response, Status::OK, tag(5));
server_ok(4);
server_ok(5);
client_ok(1);
response_reader->Finish(&recv_response, &recv_status, tag(6));
client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second);
EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);

Loading…
Cancel
Save