diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 4bc540cb577..1821d666356 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -420,7 +420,8 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); context_->initial_metadata_received_ = true; } - read_buf_.AddRecvMessage(msg); + bool ignore; + read_buf_.AddRecvMessage(msg, &ignore); call_.PerformOps(&read_buf_); } @@ -445,7 +446,7 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, template class ClientAsyncWriter final : public ClientAsyncStreamingInterface, - public WriterInterface { + public AsyncWriterInterface { public: ClientAsyncWriter(ChannelInterface *channel, CompletionQueue* cq, const RpcMethod &method, ClientContext *context, @@ -472,7 +473,7 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, call_.PerformOps(&write_buf_); } - void WritesDone(void* tag) override { + void WritesDone(void* tag) { writes_done_buf_.Reset(tag); writes_done_buf_.AddClientSendClose(); call_.PerformOps(&writes_done_buf_); @@ -484,7 +485,8 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); context_->initial_metadata_received_ = true; } - finish_buf_.AddRecvMessage(response_, &got_message_); + bool ignore; + finish_buf_.AddRecvMessage(response_, &ignore); finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -509,7 +511,7 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, public: ClientAsyncReaderWriter(ChannelInterface *channel, CompletionQueue* cq, const RpcMethod &method, ClientContext *context, void* tag) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), call_(channel->CreateCall(method, context, cq)) { init_buf_.Reset(tag); init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); call_.PerformOps(&init_buf_); @@ -530,7 +532,8 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); context_->initial_metadata_received_ = true; } - read_buf_.AddRecvMessage(msg); + bool ignore; + read_buf_.AddRecvMessage(msg, &ignore); call_.PerformOps(&read_buf_); } @@ -540,7 +543,7 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, call_.PerformOps(&write_buf_); } - void WritesDone(void* tag) override { + void WritesDone(void* tag) { writes_done_buf_.Reset(tag); writes_done_buf_.AddClientSendClose(); call_.PerformOps(&writes_done_buf_); @@ -558,7 +561,6 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, private: ClientContext* context_ = nullptr; - CompletionQueue cq_; Call call_; CallOpBuffer init_buf_; CallOpBuffer meta_buf_; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index aa9be6db87d..1ab4c29451c 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -183,8 +183,8 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, printer->Print(*vars, "void $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response, " - "::grpc::Status *status, " - "::grpc::CompletionQueue *cq, void *tag);\n"); + "::grpc::Status* status, " + "::grpc::CompletionQueue* cq, void* tag);\n"); } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* $Method$(" @@ -192,8 +192,7 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, printer->Print(*vars, "::grpc::ClientAsyncWriter< $Request$>* $Method$(" "::grpc::ClientContext* context, $Response$* response, " - "::grpc::Status *status, " - "::grpc::CompletionQueue *cq, void *tag);\n"); + "::grpc::CompletionQueue* cq, void* tag);\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -202,7 +201,7 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, printer->Print(*vars, "::grpc::ClientAsyncReader< $Response$>* $Method$(" "::grpc::ClientContext* context, const $Request$* request, " - "::grpc::CompletionQueue *cq, void *tag);\n"); + "::grpc::CompletionQueue* cq, void* tag);\n"); } else if (BidiStreaming(method)) { printer->Print(*vars, "::grpc::ClientReaderWriter< $Request$, $Response$>* " @@ -210,7 +209,7 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, printer->Print(*vars, "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue *cq, void *tag);\n"); + "::grpc::CompletionQueue* cq, void* tag);\n"); } } @@ -378,6 +377,16 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::RpcMethod($Service$_method_names[$Idx$]), " "context, request, response);\n" "}\n\n"); + printer->Print(*vars, + "void $Service$::Stub::$Method$(" + "::grpc::ClientContext* context, " + "const $Request$& request, $Response$* response, ::grpc::Status* status, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + " ::grpc::AsyncUnaryCall(channel()," + "::grpc::RpcMethod($Service$_method_names[$Idx$]), " + "context, request, response, status, cq, tag);\n" + "}\n\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -390,6 +399,18 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " "context, response);\n" "}\n\n"); + printer->Print( + *vars, + "::grpc::ClientAsyncWriter< $Request$>* $Service$::Stub::$Method$(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + " return new ::grpc::ClientAsyncWriter< $Request$>(" + "channel(), cq, " + "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " + "context, response, tag);\n" + "}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -402,6 +423,18 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " "context, *request);\n" "}\n\n"); + printer->Print( + *vars, + "::grpc::ClientAsyncReader< $Response$>* $Service$::Stub::$Method$(" + "::grpc::ClientContext* context, const $Request$* request, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + " return new ::grpc::ClientAsyncReader< $Response$>(" + "channel(), cq, " + "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " + "context, *request, tag);\n" + "}\n\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, @@ -415,6 +448,19 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " "context);\n" "}\n\n"); + printer->Print( + *vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "$Service$::Stub::$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print( + *vars, + " return new ::grpc::ClientAsyncReaderWriter< $Request$, $Response$>(" + "channel(), cq, " + "::grpc::RpcMethod($Service$_method_names[$Idx$], " + "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " + "context, tag);\n" + "}\n\n"); } } diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index bc0e83733a2..1221630a35c 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -60,4 +60,11 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, return status; } +void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag) { + +} } // namespace grpc