From 6a48405ed003a416bd574d3f480b20e3dee9e9df Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 25 Jan 2016 19:12:37 -0800 Subject: [PATCH] pre sync_stream.cc creation. Does not compile --- BUILD | 9 + Makefile | 9 + build.yaml | 4 + include/grpc++/impl/codegen/async_stream.h | 462 ++++++++++++++++++ include/grpc++/impl/codegen/config_protobuf.h | 72 +++ include/grpc++/impl/codegen/proto_utils.h | 76 +++ include/grpc++/impl/codegen/stub_options.h | 43 ++ include/grpc++/impl/proto_utils.h | 39 +- include/grpc++/support/async_stream.h | 425 +--------------- include/grpc++/support/async_unary_call.h | 15 +- include/grpc++/support/config_protobuf.h | 35 +- include/grpc++/support/stub_options.h | 6 +- include/grpc++/support/sync_stream.h | 62 +-- src/compiler/cpp_generator.cc | 14 +- test/cpp/qps/client_async.cc | 2 + test/cpp/qps/client_sync.cc | 1 + test/cpp/qps/driver.cc | 1 + tools/doxygen/Doxyfile.c++ | 3 + tools/doxygen/Doxyfile.c++.internal | 3 + tools/run_tests/sources_and_headers.json | 18 + vsprojects/vcxproj/grpc++/grpc++.vcxproj | 3 + .../vcxproj/grpc++/grpc++.vcxproj.filters | 9 + .../grpc++_unsecure/grpc++_unsecure.vcxproj | 3 + .../grpc++_unsecure.vcxproj.filters | 9 + .../grpc_plugin_support.vcxproj | 3 + .../grpc_plugin_support.vcxproj.filters | 9 + 26 files changed, 792 insertions(+), 543 deletions(-) create mode 100644 include/grpc++/impl/codegen/async_stream.h create mode 100644 include/grpc++/impl/codegen/config_protobuf.h create mode 100644 include/grpc++/impl/codegen/proto_utils.h create mode 100644 include/grpc++/impl/codegen/stub_options.h diff --git a/BUILD b/BUILD index 21bbd4c6c81..59e4c444706 100644 --- a/BUILD +++ b/BUILD @@ -846,12 +846,15 @@ cc_library( "include/grpc++/support/stub_options.h", "include/grpc++/support/sync_stream.h", "include/grpc++/support/time.h", + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -958,12 +961,15 @@ cc_library( "include/grpc++/support/stub_options.h", "include/grpc++/support/sync_stream.h", "include/grpc++/support/time.h", + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -1014,12 +1020,15 @@ cc_library( "src/compiler/ruby_generator.cc", ], hdrs = [ + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", diff --git a/Makefile b/Makefile index 6dfc6656544..5a5434675f4 100644 --- a/Makefile +++ b/Makefile @@ -3071,12 +3071,15 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/support/stub_options.h \ include/grpc++/support/sync_stream.h \ include/grpc++/support/time.h \ + include/grpc++/impl/codegen/async_stream.h \ include/grpc++/impl/codegen/call.h \ include/grpc++/impl/codegen/call_hook.h \ include/grpc++/impl/codegen/channel_interface.h \ include/grpc++/impl/codegen/client_context.h \ include/grpc++/impl/codegen/completion_queue_tag.h \ include/grpc++/impl/codegen/config.h \ + include/grpc++/impl/codegen/config_protobuf.h \ + include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/security/auth_context.h \ include/grpc++/impl/codegen/serialization_traits.h \ include/grpc++/impl/codegen/server_context.h \ @@ -3359,12 +3362,15 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/support/stub_options.h \ include/grpc++/support/sync_stream.h \ include/grpc++/support/time.h \ + include/grpc++/impl/codegen/async_stream.h \ include/grpc++/impl/codegen/call.h \ include/grpc++/impl/codegen/call_hook.h \ include/grpc++/impl/codegen/channel_interface.h \ include/grpc++/impl/codegen/client_context.h \ include/grpc++/impl/codegen/completion_queue_tag.h \ include/grpc++/impl/codegen/config.h \ + include/grpc++/impl/codegen/config_protobuf.h \ + include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/security/auth_context.h \ include/grpc++/impl/codegen/serialization_traits.h \ include/grpc++/impl/codegen/server_context.h \ @@ -3454,12 +3460,15 @@ LIBGRPC_PLUGIN_SUPPORT_SRC = \ src/compiler/ruby_generator.cc \ PUBLIC_HEADERS_CXX += \ + include/grpc++/impl/codegen/async_stream.h \ include/grpc++/impl/codegen/call.h \ include/grpc++/impl/codegen/call_hook.h \ include/grpc++/impl/codegen/channel_interface.h \ include/grpc++/impl/codegen/client_context.h \ include/grpc++/impl/codegen/completion_queue_tag.h \ include/grpc++/impl/codegen/config.h \ + include/grpc++/impl/codegen/config_protobuf.h \ + include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/security/auth_context.h \ include/grpc++/impl/codegen/serialization_traits.h \ include/grpc++/impl/codegen/server_context.h \ diff --git a/build.yaml b/build.yaml index 894ef0c230a..4df69762a35 100644 --- a/build.yaml +++ b/build.yaml @@ -102,12 +102,16 @@ filegroups: - src/cpp/util/time.cc - name: grpc++_codegen public_headers: + - include/grpc++/impl/codegen/async_stream.h - include/grpc++/impl/codegen/call.h - include/grpc++/impl/codegen/call_hook.h - include/grpc++/impl/codegen/channel_interface.h - include/grpc++/impl/codegen/client_context.h + - include/grpc++/impl/codegen/stub_options.h - include/grpc++/impl/codegen/completion_queue_tag.h - include/grpc++/impl/codegen/config.h + - include/grpc++/impl/codegen/config_protobuf.h + - include/grpc++/impl/codegen/proto_utils.h - include/grpc++/impl/codegen/security/auth_context.h - include/grpc++/impl/codegen/serialization_traits.h - include/grpc++/impl/codegen/server_context.h diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h new file mode 100644 index 00000000000..b0410485f85 --- /dev/null +++ b/include/grpc++/impl/codegen/async_stream.h @@ -0,0 +1,462 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H +#define GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H + +#include +#include +#include +#include +#include + +namespace grpc { + +class CompletionQueue; + +/// Common interface for all client side asynchronous streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + /// Request notification of the reading of the initial metadata. Completion + /// will be notified by \a tag on the associated completion queue. + /// + /// \param[in] tag Tag identifying this request. + virtual void ReadInitialMetadata(void* tag) = 0; + + /// Request notification completion. + /// + /// \param[out] status To be updated with the operation status. + /// \param[in] tag Tag identifying this request. + virtual void Finish(Status* status, void* tag) = 0; +}; + +/// An interface that yields a sequence of messages of type \a R. +template +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} + + /// Read a message of type \a R into \a msg. Completion will be notified by \a + /// tag on the associated completion queue. + /// + /// \param[out] msg Where to eventually store the read message. + /// \param[in] tag The tag identifying the operation. + virtual void Read(R* msg, void* tag) = 0; +}; + +/// An interface that can be fed a sequence of messages of type \a W. +template +class AsyncWriterInterface { + public: + virtual ~AsyncWriterInterface() {} + + /// Request the writing of \a msg with identifying tag \a tag. + /// + /// Only one write may be outstanding at any given time. This means that + /// after calling Write, one must wait to receive \a tag from the completion + /// queue BEFORE calling Write again. + /// + /// \param[in] msg The message to be written. + /// \param[in] tag The tag identifying the operation. + virtual void Write(const W& msg, void* tag) = 0; +}; + +template +class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, + public AsyncReaderInterface {}; + +template +class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { + public: + /// Create a stream and write the first request out. + template + ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + const W& request, void* tag) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); + } + + void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) GRPC_OVERRIDE { + read_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + read_ops_.RecvInitialMetadata(context_); + } + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + void Finish(Status* status, void* tag) GRPC_OVERRIDE { + finish_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); + } + + private: + ClientContext* context_; + Call call_; + CallOpSet + init_ops_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet finish_ops_; +}; + +/// Common interface for client side asynchronous writing. +template +class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, + public AsyncWriterInterface { + public: + /// Signal the client is done with the writes. + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +template +class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { + public: + template + ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + R* response, void* tag) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + finish_ops_.RecvMessage(response); + + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); + } + + void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); + } + + void Write(const W& msg, void* tag) GRPC_OVERRIDE { + write_ops_.set_output_tag(tag); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void WritesDone(void* tag) GRPC_OVERRIDE { + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); + } + + void Finish(Status* status, void* tag) GRPC_OVERRIDE { + finish_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); + } + + private: + ClientContext* context_; + Call call_; + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet write_ops_; + CallOpSet writes_done_ops_; + CallOpSet finish_ops_; +}; + +/// Client-side interface for asynchronous bi-directional streaming. +template +class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { + public: + /// Signal the client is done with the writes. + /// + /// \param[in] tag The tag identifying the operation. + virtual void WritesDone(void* tag) = 0; +}; + +template +class ClientAsyncReaderWriter GRPC_FINAL + : public ClientAsyncReaderWriterInterface { + public: + ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + void* tag) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); + } + + void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) GRPC_OVERRIDE { + read_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + read_ops_.RecvInitialMetadata(context_); + } + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + void Write(const W& msg, void* tag) GRPC_OVERRIDE { + write_ops_.set_output_tag(tag); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void WritesDone(void* tag) GRPC_OVERRIDE { + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); + } + + void Finish(Status* status, void* tag) GRPC_OVERRIDE { + finish_ops_.set_output_tag(tag); + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); + } + + private: + ClientContext* context_; + Call call_; + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet write_ops_; + CallOpSet writes_done_ops_; + CallOpSet finish_ops_; +}; + +template +class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, + public AsyncReaderInterface { + public: + explicit ServerAsyncReader(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) GRPC_OVERRIDE { + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + void Finish(const W& msg, const Status& status, void* tag) { + finish_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.ok()) { + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + finish_ops_.SendMessage(msg)); + } else { + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + } + call_.PerformOps(&finish_ops_); + } + + void FinishWithError(const Status& status, void* tag) { + GPR_ASSERT(!status.ok()); + finish_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); + } + + private: + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet finish_ops_; +}; + +template +class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, + public AsyncWriterInterface { + public: + explicit ServerAsyncWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_ops_); + } + + void Write(const W& msg, void* tag) GRPC_OVERRIDE { + write_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void Finish(const Status& status, void* tag) { + finish_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); + } + + private: + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpSet meta_ops_; + CallOpSet write_ops_; + CallOpSet finish_ops_; +}; + +/// Server-side interface for asynchronous bi-directional streaming. +template +class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { + public: + explicit ServerAsyncReaderWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) GRPC_OVERRIDE { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_ops_); + } + + void Read(R* msg, void* tag) GRPC_OVERRIDE { + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); + } + + void Write(const W& msg, void* tag) GRPC_OVERRIDE { + write_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); + } + + void Finish(const Status& status, void* tag) { + finish_ops_.set_output_tag(tag); + if (!ctx_->sent_initial_metadata_) { + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); + } + + private: + friend class ::grpc::Server; + + void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet write_ops_; + CallOpSet finish_ops_; +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h new file mode 100644 index 00000000000..f1b6beaca73 --- /dev/null +++ b/include/grpc++/impl/codegen/config_protobuf.h @@ -0,0 +1,72 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H +#define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H + +#ifndef GRPC_CUSTOM_PROTOBUF_INT64 +#include +#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 +#endif + +#ifndef GRPC_CUSTOM_MESSAGE +#include +#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message +#endif + +#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM +#include +#include +#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ + ::google::protobuf::io::ZeroCopyOutputStream +#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ + ::google::protobuf::io::ZeroCopyInputStream +#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream +#endif + +namespace grpc { +namespace protobuf { + +typedef GRPC_CUSTOM_MESSAGE Message; +typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; + +namespace io { +typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; +typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; +typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; +} // namespace io + +} // namespace protobuf +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h new file mode 100644 index 00000000000..9b81853652d --- /dev/null +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_PROTO_UTILS_H +#define GRPCXX_IMPL_CODEGEN_PROTO_UTILS_H + +#include + +#include +#include +#include +#include + +namespace grpc { + +// Serialize the msg into a buffer created inside the function. The caller +// should destroy the returned buffer when done with it. If serialization fails, +// false is returned and buffer is left unchanged. +Status SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer); + +// The caller keeps ownership of buffer and msg. +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size); + +template +class SerializationTraits::value>::type> { + public: + static Status Serialize(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer, bool* own_buffer) { + *own_buffer = true; + return SerializeProto(msg, buffer); + } + static Status Deserialize(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) { + auto status = DeserializeProto(buffer, msg, max_message_size); + grpc_byte_buffer_destroy(buffer); + return status; + } +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_PROTO_UTILS_H diff --git a/include/grpc++/impl/codegen/stub_options.h b/include/grpc++/impl/codegen/stub_options.h new file mode 100644 index 00000000000..8e966a8dbf3 --- /dev/null +++ b/include/grpc++/impl/codegen/stub_options.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_STUB_OPTIONS_H +#define GRPCXX_IMPL_CODEGEN_STUB_OPTIONS_H + +namespace grpc { + +class StubOptions {}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_STUB_OPTIONS_H diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h index 283e33486df..7110a4d9ac6 100644 --- a/include/grpc++/impl/proto_utils.h +++ b/include/grpc++/impl/proto_utils.h @@ -34,43 +34,6 @@ #ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H #define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H -#include - -#include -#include -#include -#include - -namespace grpc { - -// Serialize the msg into a buffer created inside the function. The caller -// should destroy the returned buffer when done with it. If serialization fails, -// false is returned and buffer is left unchanged. -Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** buffer); - -// The caller keeps ownership of buffer and msg. -Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size); - -template -class SerializationTraits::value>::type> { - public: - static Status Serialize(const grpc::protobuf::Message& msg, - grpc_byte_buffer** buffer, bool* own_buffer) { - *own_buffer = true; - return SerializeProto(msg, buffer); - } - static Status Deserialize(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, - int max_message_size) { - auto status = DeserializeProto(buffer, msg, max_message_size); - grpc_byte_buffer_destroy(buffer); - return status; - } -}; - -} // namespace grpc +#include #endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H diff --git a/include/grpc++/support/async_stream.h b/include/grpc++/support/async_stream.h index 5a811f9217c..88147ea9d65 100644 --- a/include/grpc++/support/async_stream.h +++ b/include/grpc++/support/async_stream.h @@ -34,429 +34,6 @@ #ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H #define GRPCXX_SUPPORT_ASYNC_STREAM_H -#include -#include -#include -#include -#include - -namespace grpc { - -class CompletionQueue; - -/// Common interface for all client side asynchronous streaming. -class ClientAsyncStreamingInterface { - public: - virtual ~ClientAsyncStreamingInterface() {} - - /// Request notification of the reading of the initial metadata. Completion - /// will be notified by \a tag on the associated completion queue. - /// - /// \param[in] tag Tag identifying this request. - virtual void ReadInitialMetadata(void* tag) = 0; - - /// Request notification completion. - /// - /// \param[out] status To be updated with the operation status. - /// \param[in] tag Tag identifying this request. - virtual void Finish(Status* status, void* tag) = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. -template -class AsyncReaderInterface { - public: - virtual ~AsyncReaderInterface() {} - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - virtual void Read(R* msg, void* tag) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. -template -class AsyncWriterInterface { - public: - virtual ~AsyncWriterInterface() {} - - /// Request the writing of \a msg with identifying tag \a tag. - /// - /// Only one write may be outstanding at any given time. This means that - /// after calling Write, one must wait to receive \a tag from the completion - /// queue BEFORE calling Write again. - /// - /// \param[in] msg The message to be written. - /// \param[in] tag The tag identifying the operation. - virtual void Write(const W& msg, void* tag) = 0; -}; - -template -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface {}; - -template -class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { - public: - /// Create a stream and write the first request out. - template - ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet - init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template -class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { - public: - template - ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet init_ops_; - CallOpSet meta_ops_; - CallOpSet write_ops_; - CallOpSet writes_done_ops_; - CallOpSet finish_ops_; -}; - -/// Client-side interface for asynchronous bi-directional streaming. -template -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template -class ClientAsyncReaderWriter GRPC_FINAL - : public ClientAsyncReaderWriterInterface { - public: - ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet write_ops_; - CallOpSet writes_done_ops_; - CallOpSet finish_ops_; -}; - -template -class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncReaderInterface { - public: - explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(const W& msg, const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); - } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_ops_); - } - - void FinishWithError(const Status& status, void* tag) { - GPR_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; -}; - -template -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface { - public: - explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet write_ops_; - CallOpSet finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template -class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { - public: - explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class ::grpc::Server; - - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet write_ops_; - CallOpSet finish_ops_; -}; - -} // namespace grpc +#include #endif // GRPCXX_SUPPORT_ASYNC_STREAM_H diff --git a/include/grpc++/support/async_unary_call.h b/include/grpc++/support/async_unary_call.h index 55ca56b6a6f..3a601e063d3 100644 --- a/include/grpc++/support/async_unary_call.h +++ b/include/grpc++/support/async_unary_call.h @@ -34,17 +34,18 @@ #ifndef GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H #define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H -#include +#include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include namespace grpc { +class CompletionQueue; + template class ClientAsyncResponseReaderInterface { public: diff --git a/include/grpc++/support/config_protobuf.h b/include/grpc++/support/config_protobuf.h index 8235590d413..3e7f169652d 100644 --- a/include/grpc++/support/config_protobuf.h +++ b/include/grpc++/support/config_protobuf.h @@ -34,39 +34,6 @@ #ifndef GRPCXX_SUPPORT_CONFIG_PROTOBUF_H #define GRPCXX_SUPPORT_CONFIG_PROTOBUF_H -#ifndef GRPC_CUSTOM_PROTOBUF_INT64 -#include -#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 -#endif - -#ifndef GRPC_CUSTOM_MESSAGE -#include -#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message -#endif - -#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM -#include -#include -#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ - ::google::protobuf::io::ZeroCopyOutputStream -#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ - ::google::protobuf::io::ZeroCopyInputStream -#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream -#endif - -namespace grpc { -namespace protobuf { - -typedef GRPC_CUSTOM_MESSAGE Message; -typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; - -namespace io { -typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; -typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; -typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; -} // namespace io - -} // namespace protobuf -} // namespace grpc +#include #endif // GRPCXX_SUPPORT_CONFIG_PROTOBUF_H diff --git a/include/grpc++/support/stub_options.h b/include/grpc++/support/stub_options.h index 973aa9bc838..17a13c357e6 100644 --- a/include/grpc++/support/stub_options.h +++ b/include/grpc++/support/stub_options.h @@ -34,10 +34,6 @@ #ifndef GRPCXX_SUPPORT_STUB_OPTIONS_H #define GRPCXX_SUPPORT_STUB_OPTIONS_H -namespace grpc { - -class StubOptions {}; - -} // namespace grpc +#include #endif // GRPCXX_SUPPORT_STUB_OPTIONS_H diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h index 3f3acc5308e..77066ada62f 100644 --- a/include/grpc++/support/sync_stream.h +++ b/include/grpc++/support/sync_stream.h @@ -34,18 +34,18 @@ #ifndef GRPCXX_SUPPORT_SYNC_STREAM_H #define GRPCXX_SUPPORT_SYNC_STREAM_H -#include -#include +#include #include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include namespace grpc { +class CompletionQueue; + /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -121,7 +121,9 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { template ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(new CompletionQueue), + call_(channel->CreateCall(method, context, cq_.get())) { CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); @@ -129,7 +131,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { GPR_ASSERT(ops.SendMessage(request).ok()); ops.ClientSendClose(); call_.PerformOps(&ops); - cq_.Pluck(&ops); + cq_->Pluck(&ops); } void WaitForInitialMetadata() GRPC_OVERRIDE { @@ -138,7 +140,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { CallOpSet ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); - cq_.Pluck(&ops); /// status ignored + cq_->Pluck(&ops); /// status ignored } bool Read(R* msg) GRPC_OVERRIDE { @@ -148,7 +150,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { } ops.RecvMessage(msg); call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; + return cq_->Pluck(&ops) && ops.got_message; } Status Finish() GRPC_OVERRIDE { @@ -156,13 +158,13 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); + GPR_ASSERT(cq_->Pluck(&ops)); return status; } private: ClientContext* context_; - CompletionQueue cq_; + std::unique_ptr cq_; Call call_; }; @@ -185,13 +187,15 @@ class ClientWriter : public ClientWriterInterface { template ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(new CompletionQueue), + call_(channel->CreateCall(method, context, cq_.get())) { finish_ops_.RecvMessage(response); CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); - cq_.Pluck(&ops); + cq_->Pluck(&ops); } using WriterInterface::Write; @@ -201,14 +205,14 @@ class ClientWriter : public ClientWriterInterface { return false; } call_.PerformOps(&ops); - return cq_.Pluck(&ops); + return cq_->Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { CallOpSet ops; ops.ClientSendClose(); call_.PerformOps(&ops); - return cq_.Pluck(&ops); + return cq_->Pluck(&ops); } /// Read the final response and wait for the final status. @@ -216,14 +220,14 @@ class ClientWriter : public ClientWriterInterface { Status status; finish_ops_.ClientRecvStatus(context_, &status); call_.PerformOps(&finish_ops_); - GPR_ASSERT(cq_.Pluck(&finish_ops_)); + GPR_ASSERT(cq_->Pluck(&finish_ops_)); return status; } private: ClientContext* context_; CallOpSet finish_ops_; - CompletionQueue cq_; + std::unique_ptr cq_; Call call_; }; @@ -251,11 +255,13 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(new CompletionQueue), + call_(channel->CreateCall(method, context, cq_.get())) { CallOpSet ops; ops.SendInitialMetadata(context->send_initial_metadata_); call_.PerformOps(&ops); - cq_.Pluck(&ops); + cq_->Pluck(&ops); } void WaitForInitialMetadata() GRPC_OVERRIDE { @@ -264,7 +270,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { CallOpSet ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored + cq_->Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { @@ -274,7 +280,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { } ops.RecvMessage(msg); call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; + return cq_->Pluck(&ops) && ops.got_message; } using WriterInterface::Write; @@ -282,14 +288,14 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { CallOpSet ops; if (!ops.SendMessage(msg, options).ok()) return false; call_.PerformOps(&ops); - return cq_.Pluck(&ops); + return cq_->Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { CallOpSet ops; ops.ClientSendClose(); call_.PerformOps(&ops); - return cq_.Pluck(&ops); + return cq_->Pluck(&ops); } Status Finish() GRPC_OVERRIDE { @@ -297,13 +303,13 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); + GPR_ASSERT(cq_->Pluck(&ops)); return status; } private: ClientContext* context_; - CompletionQueue cq_; + std::unique_ptr cq_; Call call_; }; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index c491eb88c88..30019131816 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -112,13 +112,13 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file, grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string temp = - "#include \n" + "#include \n" + "#include \n" + "#include \n" "#include \n" - "#include \n" - "#include \n" + "#include \n" "#include \n" - "#include \n" - "#include \n" + "#include \n" "#include \n" "\n" "namespace grpc {\n" @@ -863,10 +863,10 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file, printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); - printer.Print(vars, "#include \n"); + printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); - printer.Print(vars, "#include \n"); + printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); if (!file->package().empty()) { diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index d02769d8e63..60ff7d9a320 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -43,6 +43,8 @@ #include #include +#include +#include #include #include #include diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 92fbf240ce2..d93537b279b 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -41,6 +41,7 @@ #include #include +#include #include #include #include diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index acb265b3086..4741d4e3273 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -39,6 +39,7 @@ #include #include #include +#include #include #include diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 687dd328b1d..82e687e5fb8 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -803,12 +803,15 @@ include/grpc++/support/string_ref.h \ include/grpc++/support/stub_options.h \ include/grpc++/support/sync_stream.h \ include/grpc++/support/time.h \ +include/grpc++/impl/codegen/async_stream.h \ include/grpc++/impl/codegen/call.h \ include/grpc++/impl/codegen/call_hook.h \ include/grpc++/impl/codegen/channel_interface.h \ include/grpc++/impl/codegen/client_context.h \ include/grpc++/impl/codegen/completion_queue_tag.h \ include/grpc++/impl/codegen/config.h \ +include/grpc++/impl/codegen/config_protobuf.h \ +include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/security/auth_context.h \ include/grpc++/impl/codegen/serialization_traits.h \ include/grpc++/impl/codegen/server_context.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index d7968279f8b..5df2bcf7f4d 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -803,12 +803,15 @@ include/grpc++/support/string_ref.h \ include/grpc++/support/stub_options.h \ include/grpc++/support/sync_stream.h \ include/grpc++/support/time.h \ +include/grpc++/impl/codegen/async_stream.h \ include/grpc++/impl/codegen/call.h \ include/grpc++/impl/codegen/call_hook.h \ include/grpc++/impl/codegen/channel_interface.h \ include/grpc++/impl/codegen/client_context.h \ include/grpc++/impl/codegen/completion_queue_tag.h \ include/grpc++/impl/codegen/config.h \ +include/grpc++/impl/codegen/config_protobuf.h \ +include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/security/auth_context.h \ include/grpc++/impl/codegen/serialization_traits.h \ include/grpc++/impl/codegen/server_context.h \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index bf481afd80a..2b3a9a9fc72 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -3900,12 +3900,15 @@ "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -3973,12 +3976,15 @@ "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -4130,12 +4136,15 @@ "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -4200,12 +4209,15 @@ "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -4289,12 +4301,15 @@ { "deps": [], "headers": [ + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", @@ -4346,12 +4361,15 @@ "language": "c++", "name": "grpc_plugin_support", "src": [ + "include/grpc++/impl/codegen/async_stream.h", "include/grpc++/impl/codegen/call.h", "include/grpc++/impl/codegen/call_hook.h", "include/grpc++/impl/codegen/channel_interface.h", "include/grpc++/impl/codegen/client_context.h", "include/grpc++/impl/codegen/completion_queue_tag.h", "include/grpc++/impl/codegen/config.h", + "include/grpc++/impl/codegen/config_protobuf.h", + "include/grpc++/impl/codegen/proto_utils.h", "include/grpc++/impl/codegen/security/auth_context.h", "include/grpc++/impl/codegen/serialization_traits.h", "include/grpc++/impl/codegen/server_context.h", diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index b4b0fea1233..35b1c58bb3a 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -301,12 +301,15 @@ + + + diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index 9f7a254e1f8..d7c641e3ca0 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -228,6 +228,9 @@ include\grpc++\support + + include\grpc++\impl\codegen + include\grpc++\impl\codegen @@ -246,6 +249,12 @@ include\grpc++\impl\codegen + + include\grpc++\impl\codegen + + + include\grpc++\impl\codegen + include\grpc++\impl\codegen\security diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index e884bf51d5f..1a565c1893d 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -301,12 +301,15 @@ + + + diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index 78bddc4c66e..4c3ee549eb5 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -213,6 +213,9 @@ include\grpc++\support + + include\grpc++\impl\codegen + include\grpc++\impl\codegen @@ -231,6 +234,12 @@ include\grpc++\impl\codegen + + include\grpc++\impl\codegen + + + include\grpc++\impl\codegen + include\grpc++\impl\codegen\security diff --git a/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj b/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj index 350addf6b7d..6d481ac373c 100644 --- a/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj +++ b/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj @@ -147,12 +147,15 @@ + + + diff --git a/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj.filters b/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj.filters index dca287df85a..c2ca0c5285b 100644 --- a/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_plugin_support/grpc_plugin_support.vcxproj.filters @@ -18,6 +18,9 @@ + + include\grpc++\impl\codegen + include\grpc++\impl\codegen @@ -36,6 +39,12 @@ include\grpc++\impl\codegen + + include\grpc++\impl\codegen + + + include\grpc++\impl\codegen + include\grpc++\impl\codegen\security