diff --git a/BUILD b/BUILD index 620a954a5af..3847795f4db 100644 --- a/BUILD +++ b/BUILD @@ -735,6 +735,7 @@ cc_library( "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -746,8 +747,8 @@ cc_library( "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h", ], @@ -821,6 +822,7 @@ cc_library( "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -832,8 +834,8 @@ cc_library( "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h", ], diff --git a/Makefile b/Makefile index 9dffab01c02..626c155f64c 100644 --- a/Makefile +++ b/Makefile @@ -4654,6 +4654,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/server_builder.h \ include/grpc++/server_context.h \ include/grpc++/server_credentials.h \ + include/grpc++/support/async_stream.h \ include/grpc++/support/async_unary_call.h \ include/grpc++/support/auth_context.h \ include/grpc++/support/byte_buffer.h \ @@ -4665,8 +4666,8 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/support/slice.h \ include/grpc++/support/status.h \ include/grpc++/support/status_code_enum.h \ - include/grpc++/support/stream.h \ include/grpc++/support/stub_options.h \ + include/grpc++/support/sync_stream.h \ include/grpc++/support/thread_pool_interface.h \ include/grpc++/support/time.h \ @@ -4896,6 +4897,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/server_builder.h \ include/grpc++/server_context.h \ include/grpc++/server_credentials.h \ + include/grpc++/support/async_stream.h \ include/grpc++/support/async_unary_call.h \ include/grpc++/support/auth_context.h \ include/grpc++/support/byte_buffer.h \ @@ -4907,8 +4909,8 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/support/slice.h \ include/grpc++/support/status.h \ include/grpc++/support/status_code_enum.h \ - include/grpc++/support/stream.h \ include/grpc++/support/stub_options.h \ + include/grpc++/support/sync_stream.h \ include/grpc++/support/thread_pool_interface.h \ include/grpc++/support/time.h \ diff --git a/build.json b/build.json index 8eb4f377037..931f541cc84 100644 --- a/build.json +++ b/build.json @@ -55,6 +55,7 @@ "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -66,8 +67,8 @@ "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h" ], diff --git a/include/grpc++/generic/async_generic_service.h b/include/grpc++/generic/async_generic_service.h index 35bc9458242..8578d850ffd 100644 --- a/include/grpc++/generic/async_generic_service.h +++ b/include/grpc++/generic/async_generic_service.h @@ -35,7 +35,7 @@ #define GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H #include -#include +#include struct grpc_server; diff --git a/include/grpc++/generic/generic_stub.h b/include/grpc++/generic/generic_stub.h index 08ed77aefb6..1bb7900b068 100644 --- a/include/grpc++/generic/generic_stub.h +++ b/include/grpc++/generic/generic_stub.h @@ -34,8 +34,8 @@ #ifndef GRPCXX_GENERIC_GENERIC_STUB_H #define GRPCXX_GENERIC_GENERIC_STUB_H +#include #include -#include namespace grpc { diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 0138eb2ac0f..597798c2039 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -42,7 +42,7 @@ #include #include #include -#include +#include namespace grpc { class ServerContext; diff --git a/include/grpc++/support/stream.h b/include/grpc++/support/async_stream.h similarity index 57% rename from include/grpc++/support/stream.h rename to include/grpc++/support/async_stream.h index 89a6dd693db..83807315568 100644 --- a/include/grpc++/support/stream.h +++ b/include/grpc++/support/async_stream.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPCXX_SUPPORT_STREAM_H -#define GRPCXX_SUPPORT_STREAM_H +#ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H +#define GRPCXX_SUPPORT_ASYNC_STREAM_H #include #include @@ -45,348 +45,6 @@ namespace grpc { -// Common interface for all client side streaming. -class ClientStreamingInterface { - public: - virtual ~ClientStreamingInterface() {} - - // Wait until the stream finishes, and return the final status. When the - // client side declares it has no more message to send, either implicitly or - // by calling WritesDone, it needs to make sure there is no more message to - // be received from the server, either implicitly or by getting a false from - // a Read(). - // This function will return either: - // - when all incoming messages have been read and the server has returned - // status - // - OR when the server has returned a non-OK status - virtual Status Finish() = 0; -}; - -// An interface that yields a sequence of R messages. -template -class ReaderInterface { - public: - virtual ~ReaderInterface() {} - - // Blocking read a message and parse to msg. Returns true on success. - // The method returns false when there will be no more incoming messages, - // either because the other side has called WritesDone or the stream has - // failed (or been cancelled). - virtual bool Read(R* msg) = 0; -}; - -// An interface that can be fed a sequence of W messages. -template -class WriterInterface { - public: - virtual ~WriterInterface() {} - - // Blocking write msg to the stream. Returns true on success. - // Returns false when the stream has been closed. - virtual bool Write(const W& msg, const WriteOptions& options) = 0; - - inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } -}; - -template -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface { - public: - virtual void WaitForInitialMetadata() = 0; -}; - -template -class ClientReader GRPC_FINAL : public ClientReaderInterface { - public: - // Blocking create a stream and write the first request out. - template - ClientReader(Channel* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Should only be called before - // the first read. Calling this method is optional, and if it is not called - // the metadata will be available in ClientContext after the first read. - void WaitForInitialMetadata() { - GPR_ASSERT(!context_->initial_metadata_received_); - - CallOpSet ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet> ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - Status Finish() GRPC_OVERRIDE { - CallOpSet ops; - Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - ClientContext* context_; - CompletionQueue cq_; - Call call_; -}; - -template -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface { - public: - virtual bool WritesDone() = 0; -}; - -template -class ClientWriter : public ClientWriterInterface { - public: - // Blocking create a stream. - template - ClientWriter(Channel* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - - CallOpSet ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - using WriterInterface::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() GRPC_OVERRIDE { - CallOpSet ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - // Read the final response and wait for the final status. - Status Finish() GRPC_OVERRIDE { - Status status; - finish_ops_.ClientRecvStatus(context_, &status); - call_.PerformOps(&finish_ops_); - GPR_ASSERT(cq_.Pluck(&finish_ops_)); - return status; - } - - private: - ClientContext* context_; - CallOpSet finish_ops_; - CompletionQueue cq_; - Call call_; -}; - -// Client-side interface for bi-directional streaming. -template -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface, - public ReaderInterface { - public: - virtual void WaitForInitialMetadata() = 0; - virtual bool WritesDone() = 0; -}; - -template -class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { - public: - // Blocking create a stream. - ClientReaderWriter(Channel* channel, const RpcMethod& method, - ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet ops; - ops.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } - - // Blocking wait for initial metadata from server. The received metadata - // can only be accessed after this call returns. Should only be called before - // the first read. Calling this method is optional, and if it is not called - // the metadata will be available in ClientContext after the first read. - void WaitForInitialMetadata() { - GPR_ASSERT(!context_->initial_metadata_received_); - - CallOpSet ops; - ops.RecvInitialMetadata(context_); - call_.PerformOps(&ops); - cq_.Pluck(&ops); // status ignored - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet> ops; - if (!context_->initial_metadata_received_) { - ops.RecvInitialMetadata(context_); - } - ops.RecvMessage(msg); - call_.PerformOps(&ops); - return cq_.Pluck(&ops) && ops.got_message; - } - - using WriterInterface::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet ops; - if (!ops.SendMessage(msg, options).ok()) return false; - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - bool WritesDone() GRPC_OVERRIDE { - CallOpSet ops; - ops.ClientSendClose(); - call_.PerformOps(&ops); - return cq_.Pluck(&ops); - } - - Status Finish() GRPC_OVERRIDE { - CallOpSet ops; - Status status; - ops.ClientRecvStatus(context_, &status); - call_.PerformOps(&ops); - GPR_ASSERT(cq_.Pluck(&ops)); - return status; - } - - private: - ClientContext* context_; - CompletionQueue cq_; - Call call_; -}; - -template -class ServerReader GRPC_FINAL : public ReaderInterface { - public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -template -class ServerWriter GRPC_FINAL : public WriterInterface { - public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - using WriterInterface::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - -// Server-side interface for bi-directional streaming. -template -class ServerReaderWriter GRPC_FINAL : public WriterInterface, - public ReaderInterface { - public: - ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - CallOpSet ops; - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&ops); - call_->cq()->Pluck(&ops); - } - - bool Read(R* msg) GRPC_OVERRIDE { - CallOpSet> ops; - ops.RecvMessage(msg); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; - } - - using WriterInterface::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { - CallOpSet ops; - if (!ops.SendMessage(msg, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); - } - - private: - Call* const call_; - ServerContext* const ctx_; -}; - // Async interfaces // Common interface for all client side streaming. class ClientAsyncStreamingInterface { @@ -773,4 +431,4 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, } // namespace grpc -#endif // GRPCXX_SUPPORT_STREAM_H +#endif // GRPCXX_SUPPORT_ASYNC_STREAM_H diff --git a/include/grpc++/support/sync_stream.h b/include/grpc++/support/sync_stream.h new file mode 100644 index 00000000000..b4bb637ff23 --- /dev/null +++ b/include/grpc++/support/sync_stream.h @@ -0,0 +1,392 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_SUPPORT_SYNC_STREAM_H +#define GRPCXX_SUPPORT_SYNC_STREAM_H + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace grpc { + +// Common interface for all client side streaming. +class ClientStreamingInterface { + public: + virtual ~ClientStreamingInterface() {} + + // Wait until the stream finishes, and return the final status. When the + // client side declares it has no more message to send, either implicitly or + // by calling WritesDone, it needs to make sure there is no more message to + // be received from the server, either implicitly or by getting a false from + // a Read(). + // This function will return either: + // - when all incoming messages have been read and the server has returned + // status + // - OR when the server has returned a non-OK status + virtual Status Finish() = 0; +}; + +// An interface that yields a sequence of R messages. +template +class ReaderInterface { + public: + virtual ~ReaderInterface() {} + + // Blocking read a message and parse to msg. Returns true on success. + // The method returns false when there will be no more incoming messages, + // either because the other side has called WritesDone or the stream has + // failed (or been cancelled). + virtual bool Read(R* msg) = 0; +}; + +// An interface that can be fed a sequence of W messages. +template +class WriterInterface { + public: + virtual ~WriterInterface() {} + + // Blocking write msg to the stream. Returns true on success. + // Returns false when the stream has been closed. + virtual bool Write(const W& msg, const WriteOptions& options) = 0; + + inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } +}; + +template +class ClientReaderInterface : public ClientStreamingInterface, + public ReaderInterface { + public: + virtual void WaitForInitialMetadata() = 0; +}; + +template +class ClientReader GRPC_FINAL : public ClientReaderInterface { + public: + // Blocking create a stream and write the first request out. + template + ClientReader(Channel* channel, const RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet> ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + Status Finish() GRPC_OVERRIDE { + CallOpSet ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +template +class ClientWriterInterface : public ClientStreamingInterface, + public WriterInterface { + public: + virtual bool WritesDone() = 0; +}; + +template +class ClientWriter : public ClientWriterInterface { + public: + // Blocking create a stream. + template + ClientWriter(Channel* channel, const RpcMethod& method, + ClientContext* context, R* response) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + + CallOpSet ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + using WriterInterface::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() GRPC_OVERRIDE { + CallOpSet ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + // Read the final response and wait for the final status. + Status Finish() GRPC_OVERRIDE { + Status status; + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); + return status; + } + + private: + ClientContext* context_; + CallOpSet finish_ops_; + CompletionQueue cq_; + Call call_; +}; + +// Client-side interface for bi-directional streaming. +template +class ClientReaderWriterInterface : public ClientStreamingInterface, + public WriterInterface, + public ReaderInterface { + public: + virtual void WaitForInitialMetadata() = 0; + virtual bool WritesDone() = 0; +}; + +template +class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { + public: + // Blocking create a stream. + ClientReaderWriter(Channel* channel, const RpcMethod& method, + ClientContext* context) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + CallOpSet ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + + // Blocking wait for initial metadata from server. The received metadata + // can only be accessed after this call returns. Should only be called before + // the first read. Calling this method is optional, and if it is not called + // the metadata will be available in ClientContext after the first read. + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet> ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; + } + + using WriterInterface::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet ops; + if (!ops.SendMessage(msg, options).ok()) return false; + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + bool WritesDone() GRPC_OVERRIDE { + CallOpSet ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); + } + + Status Finish() GRPC_OVERRIDE { + CallOpSet ops; + Status status; + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); + return status; + } + + private: + ClientContext* context_; + CompletionQueue cq_; + Call call_; +}; + +template +class ServerReader GRPC_FINAL : public ReaderInterface { + public: + ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +template +class ServerWriter GRPC_FINAL : public WriterInterface { + public: + ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + using WriterInterface::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +// Server-side interface for bi-directional streaming. +template +class ServerReaderWriter GRPC_FINAL : public WriterInterface, + public ReaderInterface { + public: + ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + + void SendInitialMetadata() { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + CallOpSet ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); + } + + bool Read(R* msg) GRPC_OVERRIDE { + CallOpSet> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + + using WriterInterface::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + CallOpSet ops; + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + + private: + Call* const call_; + ServerContext* const ctx_; +}; + +} // namespace grpc + +#endif // GRPCXX_SUPPORT_SYNC_STREAM_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 5d82b605fb3..1bf2b16ed6f 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -112,13 +112,14 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file, grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string temp = + "#include \n" "#include \n" "#include \n" "#include \n" "#include \n" "#include \n" - "#include \n" "#include \n" + "#include \n" "\n" "namespace grpc {\n" "class CompletionQueue;\n" @@ -706,7 +707,8 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file, printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); printer.Print(vars, "#include \n"); - printer.Print(vars, "#include \n"); + printer.Print(vars, "#include \n"); + printer.Print(vars, "#include \n"); if (!file->package().empty()) { std::vector parts = diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index ccea4f68dea..d6337c20d79 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -785,6 +785,7 @@ include/grpc++/server.h \ include/grpc++/server_builder.h \ include/grpc++/server_context.h \ include/grpc++/server_credentials.h \ +include/grpc++/support/async_stream.h \ include/grpc++/support/async_unary_call.h \ include/grpc++/support/auth_context.h \ include/grpc++/support/byte_buffer.h \ @@ -796,8 +797,8 @@ include/grpc++/support/fixed_size_thread_pool.h \ include/grpc++/support/slice.h \ include/grpc++/support/status.h \ include/grpc++/support/status_code_enum.h \ -include/grpc++/support/stream.h \ include/grpc++/support/stub_options.h \ +include/grpc++/support/sync_stream.h \ include/grpc++/support/thread_pool_interface.h \ include/grpc++/support/time.h diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index c4a7fb07571..38999f99f04 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -785,6 +785,7 @@ include/grpc++/server.h \ include/grpc++/server_builder.h \ include/grpc++/server_context.h \ include/grpc++/server_credentials.h \ +include/grpc++/support/async_stream.h \ include/grpc++/support/async_unary_call.h \ include/grpc++/support/auth_context.h \ include/grpc++/support/byte_buffer.h \ @@ -796,8 +797,8 @@ include/grpc++/support/fixed_size_thread_pool.h \ include/grpc++/support/slice.h \ include/grpc++/support/status.h \ include/grpc++/support/status_code_enum.h \ -include/grpc++/support/stream.h \ include/grpc++/support/stub_options.h \ +include/grpc++/support/sync_stream.h \ include/grpc++/support/thread_pool_interface.h \ include/grpc++/support/time.h \ src/cpp/client/secure_credentials.h \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 8e6afe87d25..620fbddc9d2 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -13143,6 +13143,7 @@ "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -13154,8 +13155,8 @@ "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h", "src/cpp/client/create_channel_internal.h", @@ -13192,6 +13193,7 @@ "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -13203,8 +13205,8 @@ "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h", "src/cpp/client/channel.cc", @@ -13315,6 +13317,7 @@ "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -13326,8 +13329,8 @@ "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h", "src/cpp/client/create_channel_internal.h", @@ -13361,6 +13364,7 @@ "include/grpc++/server_builder.h", "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", + "include/grpc++/support/async_stream.h", "include/grpc++/support/async_unary_call.h", "include/grpc++/support/auth_context.h", "include/grpc++/support/byte_buffer.h", @@ -13372,8 +13376,8 @@ "include/grpc++/support/slice.h", "include/grpc++/support/status.h", "include/grpc++/support/status_code_enum.h", - "include/grpc++/support/stream.h", "include/grpc++/support/stub_options.h", + "include/grpc++/support/sync_stream.h", "include/grpc++/support/thread_pool_interface.h", "include/grpc++/support/time.h", "src/cpp/client/channel.cc", diff --git a/vsprojects/grpc++/grpc++.vcxproj b/vsprojects/grpc++/grpc++.vcxproj index 5181b3a2004..4eb2dfdb7c9 100644 --- a/vsprojects/grpc++/grpc++.vcxproj +++ b/vsprojects/grpc++/grpc++.vcxproj @@ -238,6 +238,7 @@ + @@ -249,8 +250,8 @@ - + diff --git a/vsprojects/grpc++/grpc++.vcxproj.filters b/vsprojects/grpc++/grpc++.vcxproj.filters index cbffd3c765a..21fbe15a70b 100644 --- a/vsprojects/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/grpc++/grpc++.vcxproj.filters @@ -171,6 +171,9 @@ include\grpc++ + + include\grpc++\support + include\grpc++\support @@ -204,10 +207,10 @@ include\grpc++\support - + include\grpc++\support - + include\grpc++\support diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj index 77f83086c7e..f7174350a69 100644 --- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -238,6 +238,7 @@ + @@ -249,8 +250,8 @@ - + diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index d4288f8987d..b1df78c7e3e 100644 --- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -156,6 +156,9 @@ include\grpc++ + + include\grpc++\support + include\grpc++\support @@ -189,10 +192,10 @@ include\grpc++\support - + include\grpc++\support - + include\grpc++\support