|
|
|
@ -20,6 +20,7 @@ |
|
|
|
|
#define GRPCPP_SUPPORT_ASYNC_STREAM_H |
|
|
|
|
|
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpcpp/impl/call.h> |
|
|
|
|
#include <grpcpp/impl/channel_interface.h> |
|
|
|
|
#include <grpcpp/impl/codegen/core_codegen_interface.h> |
|
|
|
@ -200,7 +201,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { |
|
|
|
|
public: |
|
|
|
|
// always allocated against a call arena, no memory free required
|
|
|
|
|
static void operator delete(void* /*ptr*/, std::size_t size) { |
|
|
|
|
GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader)); |
|
|
|
|
GPR_ASSERT(size == sizeof(ClientAsyncReader)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This operator should never be called as the memory should be freed as part
|
|
|
|
@ -208,10 +209,10 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { |
|
|
|
|
// delete to the operator new so that some compilers will not complain (see
|
|
|
|
|
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
|
|
|
|
|
// there are no tests catching the compiler warning.
|
|
|
|
|
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } |
|
|
|
|
static void operator delete(void*, void*) { GPR_ASSERT(false); } |
|
|
|
|
|
|
|
|
|
void StartCall(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!started_); |
|
|
|
|
GPR_ASSERT(!started_); |
|
|
|
|
started_ = true; |
|
|
|
|
StartCallInternal(tag); |
|
|
|
|
} |
|
|
|
@ -225,8 +226,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { |
|
|
|
|
/// calling code can access the received metadata through the
|
|
|
|
|
/// \a ClientContext.
|
|
|
|
|
void ReadInitialMetadata(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -234,7 +235,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(R* msg, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
read_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
read_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -249,7 +250,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { |
|
|
|
|
/// - the \a ClientContext associated with this call is updated with
|
|
|
|
|
/// possible initial and trailing metadata received from the server.
|
|
|
|
|
void Finish(grpc::Status* status, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
finish_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -265,12 +266,12 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { |
|
|
|
|
const W& request, bool start, void* tag) |
|
|
|
|
: context_(context), call_(call), started_(start) { |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); |
|
|
|
|
GPR_ASSERT(init_ops_.SendMessage(request).ok()); |
|
|
|
|
init_ops_.ClientSendClose(); |
|
|
|
|
if (start) { |
|
|
|
|
StartCallInternal(tag); |
|
|
|
|
} else { |
|
|
|
|
GPR_CODEGEN_ASSERT(tag == nullptr); |
|
|
|
|
GPR_ASSERT(tag == nullptr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -348,7 +349,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { |
|
|
|
|
public: |
|
|
|
|
// always allocated against a call arena, no memory free required
|
|
|
|
|
static void operator delete(void* /*ptr*/, std::size_t size) { |
|
|
|
|
GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter)); |
|
|
|
|
GPR_ASSERT(size == sizeof(ClientAsyncWriter)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This operator should never be called as the memory should be freed as part
|
|
|
|
@ -356,10 +357,10 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { |
|
|
|
|
// delete to the operator new so that some compilers will not complain (see
|
|
|
|
|
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
|
|
|
|
|
// there are no tests catching the compiler warning.
|
|
|
|
|
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } |
|
|
|
|
static void operator delete(void*, void*) { GPR_ASSERT(false); } |
|
|
|
|
|
|
|
|
|
void StartCall(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!started_); |
|
|
|
|
GPR_ASSERT(!started_); |
|
|
|
|
started_ = true; |
|
|
|
|
StartCallInternal(tag); |
|
|
|
|
} |
|
|
|
@ -372,8 +373,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { |
|
|
|
|
/// associated with this call is updated, and the calling code can access
|
|
|
|
|
/// the received metadata through the \a ClientContext.
|
|
|
|
|
void ReadInitialMetadata(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -381,27 +382,27 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, grpc::WriteOptions options, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
if (options.is_last_message()) { |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
write_ops_.ClientSendClose(); |
|
|
|
|
} |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WritesDone(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
write_ops_.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
@ -415,7 +416,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { |
|
|
|
|
/// - attempts to fill in the \a response parameter passed to this class's
|
|
|
|
|
/// constructor with the server's response message.
|
|
|
|
|
void Finish(grpc::Status* status, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
finish_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -435,7 +436,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { |
|
|
|
|
if (start) { |
|
|
|
|
StartCallInternal(tag); |
|
|
|
|
} else { |
|
|
|
|
GPR_CODEGEN_ASSERT(tag == nullptr); |
|
|
|
|
GPR_ASSERT(tag == nullptr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -515,7 +516,7 @@ class ClientAsyncReaderWriter final |
|
|
|
|
public: |
|
|
|
|
// always allocated against a call arena, no memory free required
|
|
|
|
|
static void operator delete(void* /*ptr*/, std::size_t size) { |
|
|
|
|
GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter)); |
|
|
|
|
GPR_ASSERT(size == sizeof(ClientAsyncReaderWriter)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This operator should never be called as the memory should be freed as part
|
|
|
|
@ -523,10 +524,10 @@ class ClientAsyncReaderWriter final |
|
|
|
|
// delete to the operator new so that some compilers will not complain (see
|
|
|
|
|
// https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
|
|
|
|
|
// there are no tests catching the compiler warning.
|
|
|
|
|
static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } |
|
|
|
|
static void operator delete(void*, void*) { GPR_ASSERT(false); } |
|
|
|
|
|
|
|
|
|
void StartCall(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!started_); |
|
|
|
|
GPR_ASSERT(!started_); |
|
|
|
|
started_ = true; |
|
|
|
|
StartCallInternal(tag); |
|
|
|
|
} |
|
|
|
@ -539,8 +540,8 @@ class ClientAsyncReaderWriter final |
|
|
|
|
/// is updated with it, and then the receiving initial metadata can
|
|
|
|
|
/// be accessed through this \a ClientContext.
|
|
|
|
|
void ReadInitialMetadata(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(!context_->initial_metadata_received_); |
|
|
|
|
|
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -548,7 +549,7 @@ class ClientAsyncReaderWriter final |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Read(R* msg, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
read_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
read_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -558,27 +559,27 @@ class ClientAsyncReaderWriter final |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Write(const W& msg, grpc::WriteOptions options, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
if (options.is_last_message()) { |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
write_ops_.ClientSendClose(); |
|
|
|
|
} |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WritesDone(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
write_ops_.ClientSendClose(); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
@ -589,7 +590,7 @@ class ClientAsyncReaderWriter final |
|
|
|
|
/// - the \a ClientContext associated with this call is updated with
|
|
|
|
|
/// possible initial and trailing metadata sent from the server.
|
|
|
|
|
void Finish(grpc::Status* status, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(started_); |
|
|
|
|
GPR_ASSERT(started_); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!context_->initial_metadata_received_) { |
|
|
|
|
finish_ops_.RecvInitialMetadata(context_); |
|
|
|
@ -606,7 +607,7 @@ class ClientAsyncReaderWriter final |
|
|
|
|
if (start) { |
|
|
|
|
StartCallInternal(tag); |
|
|
|
|
} else { |
|
|
|
|
GPR_CODEGEN_ASSERT(tag == nullptr); |
|
|
|
|
GPR_ASSERT(tag == nullptr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -706,7 +707,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { |
|
|
|
|
/// - The initial metadata that will be sent to the client from this op will
|
|
|
|
|
/// be taken from the \a ServerContext associated with the call.
|
|
|
|
|
void SendInitialMetadata(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, |
|
|
|
@ -765,7 +766,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { |
|
|
|
|
/// gRPC doesn't take ownership or a reference to \a status, so it is safe to
|
|
|
|
|
/// to deallocate once FinishWithError returns.
|
|
|
|
|
void FinishWithError(const grpc::Status& status, void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!status.ok()); |
|
|
|
|
GPR_ASSERT(!status.ok()); |
|
|
|
|
finish_ops_.set_output_tag(tag); |
|
|
|
|
if (!ctx_->sent_initial_metadata_) { |
|
|
|
|
finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, |
|
|
|
@ -855,7 +856,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { |
|
|
|
|
///
|
|
|
|
|
/// \param[in] tag Tag identifying this request.
|
|
|
|
|
void SendInitialMetadata(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, |
|
|
|
@ -871,7 +872,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
EnsureInitialMetadataSent(&write_ops_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -883,7 +884,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { |
|
|
|
|
|
|
|
|
|
EnsureInitialMetadataSent(&write_ops_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -902,7 +903,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
EnsureInitialMetadataSent(&write_ops_); |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
@ -1021,7 +1022,7 @@ class ServerAsyncReaderWriter final |
|
|
|
|
///
|
|
|
|
|
/// \param[in] tag Tag identifying this request.
|
|
|
|
|
void SendInitialMetadata(void* tag) override { |
|
|
|
|
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
GPR_ASSERT(!ctx_->sent_initial_metadata_); |
|
|
|
|
|
|
|
|
|
meta_ops_.set_output_tag(tag); |
|
|
|
|
meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, |
|
|
|
@ -1043,7 +1044,7 @@ class ServerAsyncReaderWriter final |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
EnsureInitialMetadataSent(&write_ops_); |
|
|
|
|
// TODO(ctiller): don't assert
|
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1053,7 +1054,7 @@ class ServerAsyncReaderWriter final |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
} |
|
|
|
|
EnsureInitialMetadataSent(&write_ops_); |
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1073,7 +1074,7 @@ class ServerAsyncReaderWriter final |
|
|
|
|
write_ops_.set_output_tag(tag); |
|
|
|
|
EnsureInitialMetadataSent(&write_ops_); |
|
|
|
|
options.set_buffer_hint(); |
|
|
|
|
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); |
|
|
|
|
write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); |
|
|
|
|
call_.PerformOps(&write_ops_); |
|
|
|
|
} |
|
|
|
|