split stream.h into sync_stream.h and async_stream.h

pull/3028/head
yang-g 9 years ago
parent 9e2f90cd06
commit 9fb35a5332
  1. 6
      BUILD
  2. 6
      Makefile
  3. 3
      build.json
  4. 2
      include/grpc++/generic/async_generic_service.h
  5. 2
      include/grpc++/generic/generic_stub.h
  6. 2
      include/grpc++/impl/rpc_service_method.h
  7. 348
      include/grpc++/support/async_stream.h
  8. 392
      include/grpc++/support/sync_stream.h
  9. 6
      src/compiler/cpp_generator.cc
  10. 3
      tools/doxygen/Doxyfile.c++
  11. 3
      tools/doxygen/Doxyfile.c++.internal
  12. 12
      tools/run_tests/sources_and_headers.json
  13. 3
      vsprojects/grpc++/grpc++.vcxproj
  14. 7
      vsprojects/grpc++/grpc++.vcxproj.filters
  15. 3
      vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
  16. 7
      vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters

@ -735,6 +735,7 @@ cc_library(
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -746,8 +747,8 @@ cc_library(
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h",
],
@ -821,6 +822,7 @@ cc_library(
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -832,8 +834,8 @@ cc_library(
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h",
],

@ -4654,6 +4654,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/server_builder.h \
include/grpc++/server_context.h \
include/grpc++/server_credentials.h \
include/grpc++/support/async_stream.h \
include/grpc++/support/async_unary_call.h \
include/grpc++/support/auth_context.h \
include/grpc++/support/byte_buffer.h \
@ -4665,8 +4666,8 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/support/slice.h \
include/grpc++/support/status.h \
include/grpc++/support/status_code_enum.h \
include/grpc++/support/stream.h \
include/grpc++/support/stub_options.h \
include/grpc++/support/sync_stream.h \
include/grpc++/support/thread_pool_interface.h \
include/grpc++/support/time.h \
@ -4896,6 +4897,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/server_builder.h \
include/grpc++/server_context.h \
include/grpc++/server_credentials.h \
include/grpc++/support/async_stream.h \
include/grpc++/support/async_unary_call.h \
include/grpc++/support/auth_context.h \
include/grpc++/support/byte_buffer.h \
@ -4907,8 +4909,8 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/support/slice.h \
include/grpc++/support/status.h \
include/grpc++/support/status_code_enum.h \
include/grpc++/support/stream.h \
include/grpc++/support/stub_options.h \
include/grpc++/support/sync_stream.h \
include/grpc++/support/thread_pool_interface.h \
include/grpc++/support/time.h \

@ -55,6 +55,7 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -66,8 +67,8 @@
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h"
],

@ -35,7 +35,7 @@
#define GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/stream.h>
#include <grpc++/support/async_stream.h>
struct grpc_server;

@ -34,8 +34,8 @@
#ifndef GRPCXX_GENERIC_GENERIC_STUB_H
#define GRPCXX_GENERIC_GENERIC_STUB_H
#include <grpc++/support/async_stream.h>
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/stream.h>
namespace grpc {

@ -42,7 +42,7 @@
#include <grpc++/impl/rpc_method.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/stream.h>
#include <grpc++/support/sync_stream.h>
namespace grpc {
class ServerContext;

@ -31,8 +31,8 @@
*
*/
#ifndef GRPCXX_SUPPORT_STREAM_H
#define GRPCXX_SUPPORT_STREAM_H
#ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H
#define GRPCXX_SUPPORT_ASYNC_STREAM_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
@ -45,348 +45,6 @@
namespace grpc {
// Common interface for all client side streaming.
class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read().
// This function will return either:
// - when all incoming messages have been read and the server has returned
// status
// - OR when the server has returned a non-OK status
virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
template <class R>
class ReaderInterface {
public:
virtual ~ReaderInterface() {}
// Blocking read a message and parse to msg. Returns true on success.
// The method returns false when there will be no more incoming messages,
// either because the other side has called WritesDone or the stream has
// failed (or been cancelled).
virtual bool Read(R* msg) = 0;
};
// An interface that can be fed a sequence of W messages.
template <class W>
class WriterInterface {
public:
virtual ~WriterInterface() {}
// Blocking write msg to the stream. Returns true on success.
// Returns false when the stream has been closed.
virtual bool Write(const W& msg, const WriteOptions& options) = 0;
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
};
template <class R>
class ClientReaderInterface : public ClientStreamingInterface,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
};
template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
template <class W>
ClientReader(Channel* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class W>
class ClientWriterInterface : public ClientStreamingInterface,
public WriterInterface<W> {
public:
virtual bool WritesDone() = 0;
};
template <class W>
class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
template <class R>
ClientWriter(Channel* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
Status status;
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriterInterface : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
virtual bool WritesDone() = 0;
};
template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
public:
// Blocking create a stream.
ClientReaderWriter(Channel* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class R>
class ServerReader GRPC_FINAL : public ReaderInterface<R> {
public:
ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
Call* const call_;
ServerContext* const ctx_;
};
template <class W>
class ServerWriter GRPC_FINAL : public WriterInterface<W> {
public:
ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
public ReaderInterface<R> {
public:
ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
// Async interfaces
// Common interface for all client side streaming.
class ClientAsyncStreamingInterface {
@ -773,4 +431,4 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
} // namespace grpc
#endif // GRPCXX_SUPPORT_STREAM_H
#endif // GRPCXX_SUPPORT_ASYNC_STREAM_H

@ -0,0 +1,392 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_SUPPORT_SYNC_STREAM_H
#define GRPCXX_SUPPORT_SYNC_STREAM_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
#include <grpc++/support/status.h>
namespace grpc {
// Common interface for all client side streaming.
class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read().
// This function will return either:
// - when all incoming messages have been read and the server has returned
// status
// - OR when the server has returned a non-OK status
virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
template <class R>
class ReaderInterface {
public:
virtual ~ReaderInterface() {}
// Blocking read a message and parse to msg. Returns true on success.
// The method returns false when there will be no more incoming messages,
// either because the other side has called WritesDone or the stream has
// failed (or been cancelled).
virtual bool Read(R* msg) = 0;
};
// An interface that can be fed a sequence of W messages.
template <class W>
class WriterInterface {
public:
virtual ~WriterInterface() {}
// Blocking write msg to the stream. Returns true on success.
// Returns false when the stream has been closed.
virtual bool Write(const W& msg, const WriteOptions& options) = 0;
inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
};
template <class R>
class ClientReaderInterface : public ClientStreamingInterface,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
};
template <class R>
class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
template <class W>
ClientReader(Channel* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(ops.SendMessage(request).ok());
ops.ClientSendClose();
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class W>
class ClientWriterInterface : public ClientStreamingInterface,
public WriterInterface<W> {
public:
virtual bool WritesDone() = 0;
};
template <class W>
class ClientWriter : public ClientWriterInterface<W> {
public:
// Blocking create a stream.
template <class R>
ClientWriter(Channel* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
Status status;
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
return status;
}
private:
ClientContext* context_;
CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriterInterface : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
virtual void WaitForInitialMetadata() = 0;
virtual bool WritesDone() = 0;
};
template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
public:
// Blocking create a stream.
ClientReaderWriter(Channel* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_);
call_.PerformOps(&ops);
cq_.Pluck(&ops);
}
// Blocking wait for initial metadata from server. The received metadata
// can only be accessed after this call returns. Should only be called before
// the first read. Calling this method is optional, and if it is not called
// the metadata will be available in ClientContext after the first read.
void WaitForInitialMetadata() {
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpSet<CallOpRecvInitialMetadata> ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
ops.RecvMessage(msg);
call_.PerformOps(&ops);
return cq_.Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
bool WritesDone() GRPC_OVERRIDE {
CallOpSet<CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
Status Finish() GRPC_OVERRIDE {
CallOpSet<CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
GPR_ASSERT(cq_.Pluck(&ops));
return status;
}
private:
ClientContext* context_;
CompletionQueue cq_;
Call call_;
};
template <class R>
class ServerReader GRPC_FINAL : public ReaderInterface<R> {
public:
ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
Call* const call_;
ServerContext* const ctx_;
};
template <class W>
class ServerWriter GRPC_FINAL : public WriterInterface<W> {
public:
ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
public ReaderInterface<R> {
public:
ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
void SendInitialMetadata() {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
call_->PerformOps(&ops);
call_->cq()->Pluck(&ops);
}
bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
ops.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops);
}
private:
Call* const call_;
ServerContext* const ctx_;
};
} // namespace grpc
#endif // GRPCXX_SUPPORT_SYNC_STREAM_H

@ -112,13 +112,14 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string temp =
"#include <grpc++/support/async_stream.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
"#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/support/async_unary_call.h>\n"
"#include <grpc++/support/status.h>\n"
"#include <grpc++/support/stream.h>\n"
"#include <grpc++/support/stub_options.h>\n"
"#include <grpc++/support/sync_stream.h>\n"
"\n"
"namespace grpc {\n"
"class CompletionQueue;\n"
@ -706,7 +707,8 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
printer.Print(vars, "#include <grpc++/support/async_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/support/stream.h>\n");
printer.Print(vars, "#include <grpc++/support/async_stream.h>\n");
printer.Print(vars, "#include <grpc++/support/sync_stream.h>\n");
if (!file->package().empty()) {
std::vector<grpc::string> parts =

@ -785,6 +785,7 @@ include/grpc++/server.h \
include/grpc++/server_builder.h \
include/grpc++/server_context.h \
include/grpc++/server_credentials.h \
include/grpc++/support/async_stream.h \
include/grpc++/support/async_unary_call.h \
include/grpc++/support/auth_context.h \
include/grpc++/support/byte_buffer.h \
@ -796,8 +797,8 @@ include/grpc++/support/fixed_size_thread_pool.h \
include/grpc++/support/slice.h \
include/grpc++/support/status.h \
include/grpc++/support/status_code_enum.h \
include/grpc++/support/stream.h \
include/grpc++/support/stub_options.h \
include/grpc++/support/sync_stream.h \
include/grpc++/support/thread_pool_interface.h \
include/grpc++/support/time.h

@ -785,6 +785,7 @@ include/grpc++/server.h \
include/grpc++/server_builder.h \
include/grpc++/server_context.h \
include/grpc++/server_credentials.h \
include/grpc++/support/async_stream.h \
include/grpc++/support/async_unary_call.h \
include/grpc++/support/auth_context.h \
include/grpc++/support/byte_buffer.h \
@ -796,8 +797,8 @@ include/grpc++/support/fixed_size_thread_pool.h \
include/grpc++/support/slice.h \
include/grpc++/support/status.h \
include/grpc++/support/status_code_enum.h \
include/grpc++/support/stream.h \
include/grpc++/support/stub_options.h \
include/grpc++/support/sync_stream.h \
include/grpc++/support/thread_pool_interface.h \
include/grpc++/support/time.h \
src/cpp/client/secure_credentials.h \

@ -13143,6 +13143,7 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -13154,8 +13155,8 @@
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h",
"src/cpp/client/create_channel_internal.h",
@ -13192,6 +13193,7 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -13203,8 +13205,8 @@
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h",
"src/cpp/client/channel.cc",
@ -13315,6 +13317,7 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -13326,8 +13329,8 @@
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h",
"src/cpp/client/create_channel_internal.h",
@ -13361,6 +13364,7 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/support/async_stream.h",
"include/grpc++/support/async_unary_call.h",
"include/grpc++/support/auth_context.h",
"include/grpc++/support/byte_buffer.h",
@ -13372,8 +13376,8 @@
"include/grpc++/support/slice.h",
"include/grpc++/support/status.h",
"include/grpc++/support/status_code_enum.h",
"include/grpc++/support/stream.h",
"include/grpc++/support/stub_options.h",
"include/grpc++/support/sync_stream.h",
"include/grpc++/support/thread_pool_interface.h",
"include/grpc++/support/time.h",
"src/cpp/client/channel.cc",

@ -238,6 +238,7 @@
<ClInclude Include="..\..\include\grpc++\server_builder.h" />
<ClInclude Include="..\..\include\grpc++\server_context.h" />
<ClInclude Include="..\..\include\grpc++\server_credentials.h" />
<ClInclude Include="..\..\include\grpc++\support\async_stream.h" />
<ClInclude Include="..\..\include\grpc++\support\async_unary_call.h" />
<ClInclude Include="..\..\include\grpc++\support\auth_context.h" />
<ClInclude Include="..\..\include\grpc++\support\byte_buffer.h" />
@ -249,8 +250,8 @@
<ClInclude Include="..\..\include\grpc++\support\slice.h" />
<ClInclude Include="..\..\include\grpc++\support\status.h" />
<ClInclude Include="..\..\include\grpc++\support\status_code_enum.h" />
<ClInclude Include="..\..\include\grpc++\support\stream.h" />
<ClInclude Include="..\..\include\grpc++\support\stub_options.h" />
<ClInclude Include="..\..\include\grpc++\support\sync_stream.h" />
<ClInclude Include="..\..\include\grpc++\support\thread_pool_interface.h" />
<ClInclude Include="..\..\include\grpc++\support\time.h" />
</ItemGroup>

@ -171,6 +171,9 @@
<ClInclude Include="..\..\include\grpc++\server_credentials.h">
<Filter>include\grpc++</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\async_stream.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\async_unary_call.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
@ -204,10 +207,10 @@
<ClInclude Include="..\..\include\grpc++\support\status_code_enum.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\stream.h">
<ClInclude Include="..\..\include\grpc++\support\stub_options.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\stub_options.h">
<ClInclude Include="..\..\include\grpc++\support\sync_stream.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\thread_pool_interface.h">

@ -238,6 +238,7 @@
<ClInclude Include="..\..\include\grpc++\server_builder.h" />
<ClInclude Include="..\..\include\grpc++\server_context.h" />
<ClInclude Include="..\..\include\grpc++\server_credentials.h" />
<ClInclude Include="..\..\include\grpc++\support\async_stream.h" />
<ClInclude Include="..\..\include\grpc++\support\async_unary_call.h" />
<ClInclude Include="..\..\include\grpc++\support\auth_context.h" />
<ClInclude Include="..\..\include\grpc++\support\byte_buffer.h" />
@ -249,8 +250,8 @@
<ClInclude Include="..\..\include\grpc++\support\slice.h" />
<ClInclude Include="..\..\include\grpc++\support\status.h" />
<ClInclude Include="..\..\include\grpc++\support\status_code_enum.h" />
<ClInclude Include="..\..\include\grpc++\support\stream.h" />
<ClInclude Include="..\..\include\grpc++\support\stub_options.h" />
<ClInclude Include="..\..\include\grpc++\support\sync_stream.h" />
<ClInclude Include="..\..\include\grpc++\support\thread_pool_interface.h" />
<ClInclude Include="..\..\include\grpc++\support\time.h" />
</ItemGroup>

@ -156,6 +156,9 @@
<ClInclude Include="..\..\include\grpc++\server_credentials.h">
<Filter>include\grpc++</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\async_stream.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\async_unary_call.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
@ -189,10 +192,10 @@
<ClInclude Include="..\..\include\grpc++\support\status_code_enum.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\stream.h">
<ClInclude Include="..\..\include\grpc++\support\stub_options.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\stub_options.h">
<ClInclude Include="..\..\include\grpc++\support\sync_stream.h">
<Filter>include\grpc++\support</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc++\support\thread_pool_interface.h">

Loading…
Cancel
Save