From 06e174a088edadae2081ee5843c70cd4ba017f78 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 20 Oct 2017 05:51:12 -0700 Subject: [PATCH 1/2] Separate public and internal C++ interfaces --- include/grpc++/alarm.h | 2 +- include/grpc++/channel.h | 16 +- include/grpc++/impl/codegen/async_stream.h | 264 ++++++++------- .../grpc++/impl/codegen/async_unary_call.h | 69 ++-- include/grpc++/impl/codegen/byte_buffer.h | 21 +- include/grpc++/impl/codegen/call.h | 17 +- include/grpc++/impl/codegen/call_hook.h | 2 + .../grpc++/impl/codegen/channel_interface.h | 29 +- include/grpc++/impl/codegen/client_context.h | 23 +- .../grpc++/impl/codegen/client_unary_call.h | 73 +++-- .../grpc++/impl/codegen/completion_queue.h | 51 +-- .../impl/codegen/completion_queue_tag.h | 2 + include/grpc++/impl/codegen/metadata_map.h | 2 + .../grpc++/impl/codegen/method_handler_impl.h | 2 + include/grpc++/impl/codegen/rpc_method.h | 3 +- .../grpc++/impl/codegen/rpc_service_method.h | 3 +- include/grpc++/impl/codegen/server_context.h | 27 +- .../grpc++/impl/codegen/server_interface.h | 42 +-- include/grpc++/impl/codegen/service_type.h | 46 +-- include/grpc++/impl/codegen/sync_stream.h | 301 +++++++++++------- include/grpc++/impl/codegen/time.h | 6 +- include/grpc++/server.h | 3 +- include/grpc++/server_builder.h | 1 - src/compiler/cpp_generator.cc | 151 ++++----- src/cpp/client/channel_cc.cc | 12 +- src/cpp/client/generic_stub.cc | 10 +- src/cpp/common/completion_queue_cc.cc | 4 +- .../health/default_health_check_service.cc | 9 +- .../health/default_health_check_service.h | 2 +- src/cpp/server/server_cc.cc | 69 ++-- src/cpp/server/server_context.cc | 8 +- test/cpp/codegen/compiler_test_golden | 17 +- test/cpp/microbenchmarks/bm_cq.cc | 2 +- 33 files changed, 732 insertions(+), 557 deletions(-) diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h index 2d88d868e52..b43425e224b 100644 --- a/include/grpc++/alarm.h +++ b/include/grpc++/alarm.h @@ -92,7 +92,7 @@ class Alarm : private GrpcLibraryCodegen { } private: - class AlarmEntry : public CompletionQueueTag { + class AlarmEntry : public internal::CompletionQueueTag { public: AlarmEntry(void* tag) : tag_(tag) {} void Set(void* tag) { tag_ = tag; } diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h index c50091d6ac1..e9fb5a5d097 100644 --- a/include/grpc++/channel.h +++ b/include/grpc++/channel.h @@ -32,7 +32,7 @@ struct grpc_channel; namespace grpc { /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, - public CallHook, + public internal::CallHook, public std::enable_shared_from_this, private GrpcLibraryCodegen { public: @@ -51,18 +51,16 @@ class Channel final : public ChannelInterface, private: template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend class internal::BlockingUnaryCallImpl; friend std::shared_ptr CreateChannelInternal( const grpc::string& host, grpc_channel* c_channel); Channel(const grpc::string& host, grpc_channel* c_channel); - Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) override; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; + internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override; void* RegisterMethod(const char* method) override; void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index e60572fc939..2012b3170b3 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -30,6 +30,7 @@ namespace grpc { class CompletionQueue; +namespace internal { /// Common interface for all client side asynchronous streaming. class ClientAsyncStreamingInterface { public: @@ -151,9 +152,12 @@ class AsyncWriterInterface { } }; +} // namespace internal + template -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface {}; +class ClientAsyncReaderInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncReaderInterface {}; /// Async client-side API for doing server-streaming RPCs, /// where the incoming message stream coming from the server has @@ -161,23 +165,26 @@ class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, template class ClientAsyncReader final : public ClientAsyncReaderInterface { public: - /// Create a stream object. - /// Write the first request out if \a start is set. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. If \a start is not set, \a tag must be - /// nullptr and the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template - static ClientAsyncReader* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, const W& request, - bool start, void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, start, tag); - } + struct internal { + /// Create a stream object. + /// Write the first request out if \a start is set. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template + static ClientAsyncReader* Create(ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request, + bool start, void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, start, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -234,8 +241,8 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { private: template - ClientAsyncReader(Call call, ClientContext* context, const W& request, - bool start, void* tag) + ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, + const W& request, bool start, void* tag) : context_(context), call_(call), started_(start) { // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); @@ -255,19 +262,27 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { } ClientContext* context_; - Call call_; + ::grpc::internal::Call call_; bool started_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; /// Common interface for client side asynchronous writing. template -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface { +class ClientAsyncWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface { public: /// Signal the client is done with the writes (half-close the client stream). /// Thread-safe with respect to \a AsyncReaderInterface::Read @@ -282,27 +297,30 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template class ClientAsyncWriter final : public ClientAsyncWriterInterface { public: - /// Create a stream object. - /// Start the RPC if \a start is set - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// If \a start is not set, \a tag must be nullptr and the actual call - /// must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template - static ClientAsyncWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, const RpcMethod& method, - ClientContext* context, R* response, - bool start, void* tag) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, start, tag); - } + struct internal { + /// Create a stream object. + /// Start the RPC if \a start is set + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, \a tag must be nullptr and the actual call + /// must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template + static ClientAsyncWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response, + bool start, void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, start, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -377,8 +395,8 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { private: template - ClientAsyncWriter(Call call, ClientContext* context, R* response, bool start, - void* tag) + ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, + R* response, bool start, void* tag) : context_(context), call_(call), started_(start) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -401,13 +419,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { } ClientContext* context_; - Call call_; + ::grpc::internal::Call call_; bool started_; - CallOpSet meta_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; }; @@ -415,9 +437,10 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { /// where the client-to-server message stream has messages of type \a W, /// and the server-to-client message stream has messages of type \a R. template -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { +class ClientAsyncReaderWriterInterface + : public internal::ClientAsyncStreamingInterface, + public internal::AsyncWriterInterface, + public internal::AsyncReaderInterface { public: /// Signal the client is done with the writes (half-close the client stream). /// Thread-safe with respect to \a AsyncReaderInterface::Read @@ -434,24 +457,25 @@ template class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface { public: - /// Create a stream object. - /// Start the RPC request if \a start is set. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). If \a start is not set, \a tag must be - /// nullptr and the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, bool start, - void* tag) { - Call call = channel->CreateCall(method, context, cq); - - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, start, tag); - } + struct internal { + /// Create a stream object. + /// Start the RPC request if \a start is set. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter* Create( + ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + bool start, void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, start, tag); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -532,8 +556,8 @@ class ClientAsyncReaderWriter final } private: - ClientAsyncReaderWriter(Call call, ClientContext* context, bool start, - void* tag) + ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, + bool start, void* tag) : context_(context), call_(call), started_(start) { if (start) { StartCallInternal(tag); @@ -554,18 +578,26 @@ class ClientAsyncReaderWriter final } ClientContext* context_; - Call call_; + ::grpc::internal::Call call_; bool started_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> write_ops_; - CallOpSet finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + finish_ops_; }; template -class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, - public AsyncReaderInterface { +class ServerAsyncReaderInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncReaderInterface { public: /// Indicate that the stream is to be finished with a certain status code /// and also send out \a msg response to the client. @@ -692,20 +724,23 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_ops_; }; template -class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface { +class ServerAsyncWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface { public: /// Indicate that the stream is to be finished with a certain status code. /// Request notification for when the server has sent the appropriate @@ -823,7 +858,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template void EnsureInitialMetadataSent(T* ops) { @@ -837,20 +872,25 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; /// Server-side interface for asynchronous bi-directional streaming. template -class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { +class ServerAsyncReaderWriterInterface + : public internal::ServerAsyncStreamingInterface, + public internal::AsyncWriterInterface, + public internal::AsyncReaderInterface { public: /// Indicate that the stream is to be finished with a certain status code. /// Request notification for when the server has sent the appropriate @@ -980,7 +1020,7 @@ class ServerAsyncReaderWriter final private: friend class ::grpc::Server; - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } template void EnsureInitialMetadataSent(T* ops) { @@ -994,14 +1034,18 @@ class ServerAsyncReaderWriter final } } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage> read_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> write_ops_; - CallOpSet finish_ops_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpServerSendStatus> + finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index e472f04f56e..8a3dfbc4db0 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -75,23 +75,24 @@ template class ClientAsyncResponseReader final : public ClientAsyncResponseReaderInterface { public: - /// Start a call and write the request out if \a start is set. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// If \a start is not set, the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template - static ClientAsyncResponseReader* Create(ChannelInterface* channel, - CompletionQueue* cq, - const RpcMethod& method, - ClientContext* context, - const W& request, bool start) { - Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request, start); - } + struct internal { + /// Start a call and write the request out if \a start is set. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template + static ClientAsyncResponseReader* Create( + ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + const W& request, bool start) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request, start); + } + }; // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -138,12 +139,12 @@ class ClientAsyncResponseReader final private: ClientContext* const context_; - Call call_; + ::grpc::internal::Call call_; bool started_; template - ClientAsyncResponseReader(Call call, ClientContext* context, const W& request, - bool start) + ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, + const W& request, bool start) : context_(context), call_(call), started_(start) { // Bind the metadata at time of StartCallInternal but set up the rest here // TODO(ctiller): don't assert @@ -162,19 +163,23 @@ class ClientAsyncResponseReader final static void* operator new(std::size_t size); static void* operator new(std::size_t size, void* p) { return p; } - SneakyCallOpSet + ::grpc::internal::SneakyCallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> init_buf; - CallOpSet meta_buf; - CallOpSet, - CallOpClientRecvStatus> + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + meta_buf; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_buf; }; /// Async server-side API for handling unary calls, where the single /// response message sent to the client is of type \a W. template -class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { +class ServerAsyncResponseWriter final + : public internal::ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -262,13 +267,15 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { } private: - void BindCall(Call* call) override { call_ = *call; } + void BindCall(::grpc::internal::Call* call) override { call_ = *call; } - Call call_; + ::grpc::internal::Call call_; ServerContext* ctx_; - CallOpSet meta_buf_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + meta_buf_; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpServerSendStatus> finish_buf_; }; diff --git a/include/grpc++/impl/codegen/byte_buffer.h b/include/grpc++/impl/codegen/byte_buffer.h index 57d731be182..fe73ce7a83d 100644 --- a/include/grpc++/impl/codegen/byte_buffer.h +++ b/include/grpc++/impl/codegen/byte_buffer.h @@ -31,18 +31,19 @@ namespace grpc { +namespace internal { +class CallOpSendMessage; template class CallOpRecvMessage; +class CallOpGenericRecvMessage; class MethodHandler; template class RpcMethodHandler; template class ServerStreamingHandler; -namespace CallOpGenericRecvMessageHelper { template class DeserializeFuncType; -} // namespace CallOpGenericRecvMessageHelper - +} // namespace internal /// A sequence of bytes. class ByteBuffer final { public: @@ -97,17 +98,17 @@ class ByteBuffer final { private: friend class SerializationTraits; - friend class CallOpSendMessage; + friend class internal::CallOpSendMessage; template - friend class CallOpRecvMessage; - friend class CallOpGenericRecvMessage; - friend class MethodHandler; + friend class internal::CallOpRecvMessage; + friend class internal::CallOpGenericRecvMessage; + friend class internal::MethodHandler; template - friend class RpcMethodHandler; + friend class internal::RpcMethodHandler; template - friend class ServerStreamingHandler; + friend class internal::ServerStreamingHandler; template - friend class CallOpGenericRecvMessageHelper::DeserializeFuncType; + friend class internal::DeserializeFuncType; grpc_byte_buffer* buffer_; diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 06f107fa83a..1a988297dc4 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -43,11 +43,13 @@ namespace grpc { class ByteBuffer; -class Call; -class CallHook; class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; +namespace internal { +class Call; +class CallHook; + const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin"; // TODO(yangg) if the map is changed before we send, the pointers will be a @@ -75,6 +77,7 @@ inline grpc_metadata* FillMetadataArray( } return metadata_array; } +} // namespace internal /// Per-message write options. class WriteOptions { @@ -199,6 +202,7 @@ class WriteOptions { bool last_message_; }; +namespace internal { /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template @@ -387,7 +391,6 @@ class CallOpRecvMessage { bool allow_not_getting_message_; }; -namespace CallOpGenericRecvMessageHelper { class DeserializeFunc { public: virtual Status Deserialize(ByteBuffer* buf) = 0; @@ -407,7 +410,6 @@ class DeserializeFuncType final : public DeserializeFunc { private: R* message_; // Not a managed pointer because management is external to this }; -} // namespace CallOpGenericRecvMessageHelper class CallOpGenericRecvMessage { public: @@ -418,8 +420,7 @@ class CallOpGenericRecvMessage { void RecvMessage(R* message) { // Use an explicit base class pointer to avoid resolution error in the // following unique_ptr::reset for some old implementations. - CallOpGenericRecvMessageHelper::DeserializeFunc* func = - new CallOpGenericRecvMessageHelper::DeserializeFuncType(message); + DeserializeFunc* func = new DeserializeFuncType(message); deserialize_.reset(func); } @@ -459,7 +460,7 @@ class CallOpGenericRecvMessage { } private: - std::unique_ptr deserialize_; + std::unique_ptr deserialize_; ByteBuffer recv_buf_; bool allow_not_getting_message_; }; @@ -714,7 +715,7 @@ class Call final { grpc_call* call_; int max_receive_message_size_; }; - +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CALL_H diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h index d026cc8b583..44e9de220ed 100644 --- a/include/grpc++/impl/codegen/call_hook.h +++ b/include/grpc++/impl/codegen/call_hook.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { class CallOpSetInterface; class Call; @@ -31,6 +32,7 @@ class CallHook { virtual ~CallHook() {} virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h index 1b7590bf0c3..41c213f64a9 100644 --- a/include/grpc++/impl/codegen/channel_interface.h +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -24,10 +24,8 @@ #include namespace grpc { -class Call; +class ChannelInterface; class ClientContext; -class RpcMethod; -class CallOpSetInterface; class CompletionQueue; template @@ -45,6 +43,14 @@ class ClientAsyncReaderWriter; template class ClientAsyncResponseReader; +namespace internal { +class Call; +class CallOpSetInterface; +class RpcMethod; +template +class BlockingUnaryCallImpl; +} // namespace internal + /// Codegen interface for \a grpc::Channel. class ChannelInterface { public: @@ -96,15 +102,13 @@ class ChannelInterface { template friend class ::grpc::ClientAsyncResponseReader; template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); - friend class ::grpc::RpcMethod; - virtual Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + friend class ::grpc::internal::BlockingUnaryCallImpl; + friend class ::grpc::internal::RpcMethod; + virtual internal::Call CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; virtual void* RegisterMethod(const char* method) = 0; virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, @@ -112,7 +116,6 @@ class ChannelInterface { virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; }; - } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index 6d7e13bbf29..22b581cbc55 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -60,7 +60,16 @@ class Channel; class ChannelInterface; class CompletionQueue; class CallCredentials; +class ClientContext; + +namespace internal { class RpcMethod; +class CallOpClientRecvStatus; +class CallOpRecvInitialMetadata; +template +class BlockingUnaryCallImpl; +} // namespace internal + template class ClientReader; template @@ -345,8 +354,8 @@ class ClientContext { ClientContext& operator=(const ClientContext&); friend class ::grpc::testing::InteropClientContextInspector; - friend class CallOpClientRecvStatus; - friend class CallOpRecvInitialMetadata; + friend class ::grpc::internal::CallOpClientRecvStatus; + friend class ::grpc::internal::CallOpRecvInitialMetadata; friend class Channel; template friend class ::grpc::ClientReader; @@ -363,11 +372,7 @@ class ClientContext { template friend class ::grpc::ClientAsyncResponseReader; template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend class ::grpc::internal::BlockingUnaryCallImpl; grpc_call* call() const { return call_; } void set_call(grpc_call* call, const std::shared_ptr& channel); @@ -399,8 +404,8 @@ class ClientContext { mutable std::shared_ptr auth_context_; struct census_context* census_context_; std::multimap send_initial_metadata_; - MetadataMap recv_initial_metadata_; - MetadataMap trailing_metadata_; + internal::MetadataMap recv_initial_metadata_; + internal::MetadataMap trailing_metadata_; grpc_call* propagate_from_call_; PropagationOptions propagation_options_; diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 7c540fade9d..170c562cf3b 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -30,43 +30,60 @@ namespace grpc { class Channel; class ClientContext; class CompletionQueue; -class RpcMethod; +namespace internal { +class RpcMethod; /// Wrapper that performs a blocking unary call template Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { - CompletionQueue cq(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue - Call call(channel->CreateCall(method, context, &cq)); - CallOpSet, - CallOpClientSendClose, CallOpClientRecvStatus> - ops; - Status status = ops.SendMessage(request); - if (!status.ok()) { - return status; - } - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - ops.RecvInitialMetadata(context); - ops.RecvMessage(result); - ops.ClientSendClose(); - ops.ClientRecvStatus(context, &status); - call.PerformOps(&ops); - if (cq.Pluck(&ops)) { - if (!ops.got_message && status.ok()) { - return Status(StatusCode::UNIMPLEMENTED, - "No message returned for unary request"); + return BlockingUnaryCallImpl( + channel, method, context, request, result) + .status(); +}; + +template +class BlockingUnaryCallImpl { + public: + BlockingUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method, + ClientContext* context, const InputMessage& request, + OutputMessage* result) { + CompletionQueue cq(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue + Call call(channel->CreateCall(method, context, &cq)); + CallOpSet, + CallOpClientSendClose, CallOpClientRecvStatus> + ops; + status_ = ops.SendMessage(request); + if (!status_.ok()) { + return; + } + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + ops.RecvInitialMetadata(context); + ops.RecvMessage(result); + ops.ClientSendClose(); + ops.ClientRecvStatus(context, &status_); + call.PerformOps(&ops); + if (cq.Pluck(&ops)) { + if (!ops.got_message && status_.ok()) { + status_ = Status(StatusCode::UNIMPLEMENTED, + "No message returned for unary request"); + } + } else { + GPR_CODEGEN_ASSERT(!status_.ok()); } - } else { - GPR_CODEGEN_ASSERT(!status.ok()); } - return status; -} + Status status() { return status_; } + + private: + Status status_; +}; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index e2c0c29dca4..14de30a62e6 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -56,7 +56,19 @@ class ServerWriter; namespace internal { template class ServerReaderWriterBody; -} +} // namespace internal + +class Channel; +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class Server; +class ServerBuilder; +class ServerContext; + +namespace internal { +class CompletionQueueTag; +class RpcMethod; template class RpcMethodHandler; template @@ -66,16 +78,11 @@ class ServerStreamingHandler; template class BidiStreamingHandler; class UnknownMethodHandler; - -class Channel; -class ChannelInterface; -class ClientContext; -class CompletionQueueTag; -class CompletionQueue; -class RpcMethod; -class Server; -class ServerBuilder; -class ServerContext; +template +class TemplatedBidiStreamingHandler; +template +class BlockingUnaryCallImpl; +} // namespace internal extern CoreCodegenInterface* g_core_codegen_interface; @@ -220,22 +227,18 @@ class CompletionQueue : private GrpcLibraryCodegen { template friend class ::grpc::internal::ServerReaderWriterBody; template - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; template - friend Status BlockingUnaryCall(ChannelInterface* channel, - const RpcMethod& method, - ClientContext* context, - const InputMessage& request, - OutputMessage* result); + friend class ::grpc::internal::BlockingUnaryCallImpl; /// EXPERIMENTAL /// Creates a Thread Local cache to store the first event @@ -256,7 +259,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag) { + bool Pluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( @@ -277,7 +280,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// implementation to simple call the other TryPluck function with a zero /// timeout. i.e: /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(CompletionQueueTag* tag) { + void TryPluck(internal::CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); @@ -293,7 +296,7 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects /// that the tag is internal not something that is returned to the user. - void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) { + void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) { auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( cq_, tag, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h index 4d7d3a98dd8..cb16bcf9ff5 100644 --- a/include/grpc++/impl/codegen/completion_queue_tag.h +++ b/include/grpc++/impl/codegen/completion_queue_tag.h @@ -21,6 +21,7 @@ namespace grpc { +namespace internal { /// An interface allowing implementors to process and filter event tags. class CompletionQueueTag { public: @@ -31,6 +32,7 @@ class CompletionQueueTag { /// queue virtual bool FinalizeResult(void** tag, bool* status) = 0; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h index b73985967d3..fd4750efdd1 100644 --- a/include/grpc++/impl/codegen/metadata_map.h +++ b/include/grpc++/impl/codegen/metadata_map.h @@ -23,6 +23,7 @@ namespace grpc { +namespace internal { class MetadataMap { public: MetadataMap() { memset(&arr_, 0, sizeof(arr_)); } @@ -50,6 +51,7 @@ class MetadataMap { grpc_metadata_array arr_; std::multimap map_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index e14cb0e9265..c0af4ca1307 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -26,6 +26,7 @@ namespace grpc { +namespace internal { /// A wrapper class of an application provided rpc method handler. template class RpcMethodHandler : public MethodHandler { @@ -266,6 +267,7 @@ class UnknownMethodHandler : public MethodHandler { } }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index ac13ac56c7a..54e52364ef3 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -24,7 +24,7 @@ #include namespace grpc { - +namespace internal { /// Descriptor of an RPC method class RpcMethod { public: @@ -55,6 +55,7 @@ class RpcMethod { void* const channel_tag_; }; +} // namespace internal } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index d356012ad60..5ba11e8559b 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -32,8 +32,8 @@ namespace grpc { class ServerContext; -class StreamContextInterface; +namespace internal { /// Base class for running an RPC handler. class MethodHandler { public: @@ -71,6 +71,7 @@ class RpcServiceMethod : public RpcMethod { void* server_tag_; std::unique_ptr handler_; }; +} // namespace internal } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index b5e37fd12b1..a2d6967bf84 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -55,7 +55,6 @@ class ServerWriter; namespace internal { template class ServerReaderWriterBody; -} template class RpcMethodHandler; template @@ -65,9 +64,11 @@ class ServerStreamingHandler; template class BidiStreamingHandler; class UnknownMethodHandler; - +template +class TemplatedBidiStreamingHandler; class Call; -class CallOpBuffer; +} // namespace internal + class CompletionQueue; class Server; class ServerInterface; @@ -247,14 +248,14 @@ class ServerContext { template friend class ::grpc::internal::ServerReaderWriterBody; template - friend class RpcMethodHandler; + friend class ::grpc::internal::RpcMethodHandler; template - friend class ClientStreamingHandler; + friend class ::grpc::internal::ClientStreamingHandler; template - friend class ServerStreamingHandler; + friend class ::grpc::internal::ServerStreamingHandler; template - friend class TemplatedBidiStreamingHandler; - friend class UnknownMethodHandler; + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::ClientContext; /// Prevent copying. @@ -263,9 +264,9 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(Call* call); + void BeginCompletionOp(internal::Call* call); /// Return the tag queued by BeginCompletionOp() - CompletionQueueTag* GetCompletionOpTag(); + internal::CompletionQueueTag* GetCompletionOpTag(); ServerContext(gpr_timespec deadline, grpc_metadata_array* arr); @@ -282,7 +283,7 @@ class ServerContext { CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr auth_context_; - MetadataMap client_metadata_; + internal::MetadataMap client_metadata_; std::multimap initial_metadata_; std::multimap trailing_metadata_; @@ -290,7 +291,9 @@ class ServerContext { grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; - CallOpSet pending_ops_; + internal::CallOpSet + pending_ops_; bool has_pending_ops_; }; diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index 55937f19d7c..3bcf4c87e7c 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -30,20 +30,21 @@ namespace grpc { class AsyncGenericService; class Channel; class GenericServerContext; -class RpcService; -class ServerAsyncStreamingInterface; class ServerCompletionQueue; class ServerContext; class ServerCredentials; class Service; -class ThreadPoolInterface; extern CoreCodegenInterface* g_core_codegen_interface; /// Models a gRPC server. /// /// Servers are configured and started via \a grpc::ServerBuilder. -class ServerInterface : public CallHook { +namespace internal { +class ServerAsyncStreamingInterface; +} // namespace internal + +class ServerInterface : public internal::CallHook { public: virtual ~ServerInterface() {} @@ -78,7 +79,7 @@ class ServerInterface : public CallHook { virtual void Wait() = 0; protected: - friend class Service; + friend class ::grpc::Service; /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -116,12 +117,13 @@ class ServerInterface : public CallHook { virtual grpc_server* server() = 0; - virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; + virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) = 0; - class BaseAsyncRequest : public CompletionQueueTag { + class BaseAsyncRequest : public internal::CompletionQueueTag { public: BaseAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, bool delete_on_finalize); virtual ~BaseAsyncRequest(); @@ -131,7 +133,7 @@ class ServerInterface : public CallHook { protected: ServerInterface* const server_; ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; + internal::ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; void* const tag_; const bool delete_on_finalize_; @@ -141,7 +143,7 @@ class ServerInterface : public CallHook { class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag); // uses BaseAsyncRequest::FinalizeResult @@ -155,7 +157,7 @@ class ServerInterface : public CallHook { public: NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { @@ -170,7 +172,7 @@ class ServerInterface : public CallHook { public: PayloadAsyncRequest(void* registered_method, ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) @@ -212,7 +214,7 @@ class ServerInterface : public CallHook { void* const registered_method_; ServerInterface* const server_; ServerContext* const context_; - ServerAsyncStreamingInterface* const stream_; + internal::ServerAsyncStreamingInterface* const stream_; CompletionQueue* const call_cq_; ServerCompletionQueue* const notification_cq_; void* const tag_; @@ -223,7 +225,7 @@ class ServerInterface : public CallHook { class GenericAsyncRequest : public BaseAsyncRequest { public: GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize); @@ -235,8 +237,9 @@ class ServerInterface : public CallHook { }; template - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* message) { @@ -246,8 +249,9 @@ class ServerInterface : public CallHook { message); } - void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context, - ServerAsyncStreamingInterface* stream, + void RequestAsyncCall(internal::RpcServiceMethod* method, + ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); @@ -256,7 +260,7 @@ class ServerInterface : public CallHook { } void RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 2dc4ea0ea65..71c3d99d5c5 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -28,13 +28,14 @@ namespace grpc { -class Call; class CompletionQueue; class Server; class ServerInterface; class ServerCompletionQueue; class ServerContext; +namespace internal { +class Call; class ServerAsyncStreamingInterface { public: virtual ~ServerAsyncStreamingInterface() {} @@ -48,9 +49,10 @@ class ServerAsyncStreamingInterface { virtual void SendInitialMetadata(void* tag) = 0; private: - friend class ServerInterface; + friend class ::grpc::ServerInterface; virtual void BindCall(Call* call) = 0; }; +} // namespace internal /// Desriptor of an RPC service and its various RPC methods class Service { @@ -88,40 +90,38 @@ class Service { protected: template void RequestAsyncUnary(int index, ServerContext* context, Message* request, - ServerAsyncStreamingInterface* stream, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncClientStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncClientStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } template - void RequestAsyncServerStreaming(int index, ServerContext* context, - Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncServerStreaming( + int index, ServerContext* context, Message* request, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag, request); } - void RequestAsyncBidiStreaming(int index, ServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { + void RequestAsyncBidiStreaming( + int index, ServerContext* context, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq, notification_cq, tag); } - void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } + void AddMethod(internal::RpcServiceMethod* method) { + methods_.emplace_back(method); + } void MarkMethodAsync(int index) { GPR_CODEGEN_ASSERT( @@ -139,7 +139,7 @@ class Service { methods_[index].reset(); } - void MarkMethodStreamed(int index, MethodHandler* streamed_method) { + void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method Streamed"); methods_[index]->SetHandler(streamed_method); @@ -148,14 +148,14 @@ class Service { // case of BIDI_STREAMING that has 1 read and 1 write, in that order, // and split server-side streaming is BIDI_STREAMING with 1 read and // any number of writes, in that order. - methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); + methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING); } private: friend class Server; friend class ServerInterface; ServerInterface* server_; - std::vector> methods_; + std::vector> methods_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index c1784f1820a..49ace0beeb6 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -30,6 +30,7 @@ namespace grpc { +namespace internal { /// Common interface for all synchronous client side streaming. class ClientStreamingInterface { public: @@ -141,10 +142,12 @@ class WriterInterface { } }; +} // namespace internal + /// Client-side interface for streaming reads of message of type \a R. template -class ClientReaderInterface : public ClientStreamingInterface, - public ReaderInterface { +class ClientReaderInterface : public internal::ClientStreamingInterface, + public internal::ReaderInterface { public: /// Block to wait for initial metadata from server. The received metadata /// can only be accessed after this call returns. Should only be called before @@ -159,28 +162,14 @@ class ClientReaderInterface : public ClientStreamingInterface, template class ClientReader final : public ClientReaderInterface { public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial - /// metadata used to send to the server when starting the call. - template - ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const W& request) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet - ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); - ops.ClientSendClose(); - call_.PerformOps(&ops); - cq_.Pluck(&ops); - } + struct internal { + template + static ClientReader* Create(ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) { + return new ClientReader(channel, method, context, request); + } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -192,7 +181,8 @@ class ClientReader final : public ClientReaderInterface { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); /// status ignored @@ -209,7 +199,9 @@ class ClientReader final : public ClientReaderInterface { /// already received (if initial metadata is received, it can be then /// accessed through the \a ClientContext associated with this call). bool Read(R* msg) override { - CallOpSet> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -224,7 +216,7 @@ class ClientReader final : public ClientReaderInterface { /// The \a ClientContext associated with this call is updated with /// possible metadata received from the server. Status Finish() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); @@ -235,13 +227,38 @@ class ClientReader final : public ClientReaderInterface { private: ClientContext* context_; CompletionQueue cq_; - Call call_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial + /// metadata used to send to the server when starting the call. + template + ClientReader(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } }; /// Client-side interface for streaming writes of message type \a W. template -class ClientWriterInterface : public ClientStreamingInterface, - public WriterInterface { +class ClientWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface { public: /// Half close writing from the client. (signal that the stream of messages /// coming from the client is complete). @@ -258,30 +275,14 @@ class ClientWriterInterface : public ClientStreamingInterface, template class ClientWriter : public ClientWriterInterface { public: - /// Block to create a stream (i.e. send request headers and other initial - /// metadata to the server). Note that \a context will be used to fill - /// in custom initial metadata. \a response will be filled in with the - /// single expected response message from the server upon a successful - /// call to the \a Finish method of this instance. - template - ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, R* response) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - - if (!context_->initial_metadata_corked_) { - CallOpSet ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + struct internal { + template + static ClientWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) { + return new ClientWriter(channel, method, context, response); } - } + }; /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. @@ -292,7 +293,8 @@ class ClientWriter : public ClientWriterInterface { void WaitForInitialMetadata() { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -304,10 +306,11 @@ class ClientWriter : public ClientWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call). - using WriterInterface::Write; + using ::grpc::internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -328,7 +331,7 @@ class ClientWriter : public ClientWriterInterface { } bool WritesDone() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -352,21 +355,50 @@ class ClientWriter : public ClientWriterInterface { } private: + /// Block to create a stream (i.e. send request headers and other initial + /// metadata to the server). Note that \a context will be used to fill + /// in custom initial metadata. \a response will be filled in with the + /// single expected response message from the server upon a successful + /// call to the \a Finish method of this instance. + + template + ClientWriter(ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } + ClientContext* context_; - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpGenericRecvMessage, + ::grpc::internal::CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; - Call call_; + ::grpc::internal::Call call_; }; /// Client-side interface for bi-directional streaming with /// client-to-server stream messages of type \a W and /// server-to-client stream messages of type \a R. template -class ClientReaderWriterInterface : public ClientStreamingInterface, - public WriterInterface, - public ReaderInterface { +class ClientReaderWriterInterface : public internal::ClientStreamingInterface, + public internal::WriterInterface, + public internal::ReaderInterface { public: /// Block to wait for initial metadata from server. The received metadata /// can only be accessed after this call returns. Should only be called before @@ -375,7 +407,7 @@ class ClientReaderWriterInterface : public ClientStreamingInterface, virtual void WaitForInitialMetadata() = 0; /// Half close writing from the client. (signal that the stream of messages - /// coming from the client is complete). + /// coming from the clinet is complete). /// Blocks until currently-pending writes are completed. /// Thread-safe with respect to \a ReaderInterface::Read /// @@ -390,24 +422,13 @@ class ClientReaderWriterInterface : public ClientStreamingInterface, template class ClientReaderWriter final : public ClientReaderWriterInterface { public: - /// Block to create a stream and write the initial metadata and \a request - /// out. Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context) - : context_(context), - cq_(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, - GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq - call_(channel->CreateCall(method, context, &cq_)) { - if (!context_->initial_metadata_corked_) { - CallOpSet ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + struct internal { + static ClientReaderWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) { + return new ClientReaderWriter(channel, method, context); } - } + }; /// Block waiting to read initial metadata from the server. /// This call is optional, but if it is used, it cannot be used concurrently @@ -418,7 +439,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { void WaitForInitialMetadata() override { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> + ops; ops.RecvInitialMetadata(context_); call_.PerformOps(&ops); cq_.Pluck(&ops); // status ignored @@ -434,7 +456,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// Also receives initial metadata if not already received (updates the \a /// ClientContext associated with this call in that case). bool Read(R* msg) override { - CallOpSet> ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpRecvMessage> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -448,10 +472,11 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface::Write; + using ::grpc::internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { - CallOpSet + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, + ::grpc::internal::CallOpSendMessage, + ::grpc::internal::CallOpClientSendClose> ops; if (options.is_last_message()) { @@ -472,7 +497,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { } bool WritesDone() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; ops.ClientSendClose(); call_.PerformOps(&ops); return cq_.Pluck(&ops); @@ -484,7 +509,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { /// - the \a ClientContext associated with this call is updated with /// possible trailing metadata sent from the server. Status Finish() override { - CallOpSet ops; + ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, + ::grpc::internal::CallOpClientRecvStatus> + ops; if (!context_->initial_metadata_received_) { ops.RecvInitialMetadata(context_); } @@ -498,13 +525,34 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { private: ClientContext* context_; CompletionQueue cq_; - Call call_; + ::grpc::internal::Call call_; + + /// Block to create a stream and write the initial metadata and \a request + /// out. Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + ClientReaderWriter(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context) + : context_(context), + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { + if (!context_->initial_metadata_corked_) { + ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> + ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } + } }; /// Server-side interface for streaming reads of message of type \a R. template -class ServerReaderInterface : public ServerStreamingInterface, - public ReaderInterface {}; +class ServerReaderInterface : public internal::ServerStreamingInterface, + public internal::ReaderInterface {}; /// Synchronous (blocking) server-side API for doing client-streaming RPCs, /// where the incoming message stream coming from the client has messages of @@ -512,15 +560,13 @@ class ServerReaderInterface : public ServerStreamingInterface, template class ServerReader final : public ServerReaderInterface { public: - ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet ops; + internal::CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -537,21 +583,27 @@ class ServerReader final : public ServerReaderInterface { } bool Read(R* msg) override { - CallOpSet> ops; + internal::CallOpSet> ops; ops.RecvMessage(msg); call_->PerformOps(&ops); return call_->cq()->Pluck(&ops) && ops.got_message; } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; + + template + friend class internal::ClientStreamingHandler; + + ServerReader(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} }; /// Server-side interface for streaming writes of message of type \a W. template -class ServerWriterInterface : public ServerStreamingInterface, - public WriterInterface {}; +class ServerWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface {}; /// Synchronous (blocking) server-side API for doing for doing a /// server-streaming RPCs, where the outgoing message stream coming from the @@ -559,8 +611,6 @@ class ServerWriterInterface : public ServerStreamingInterface, template class ServerWriter final : public ServerWriterInterface { public: - ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. /// Note that initial metadata will be affected by the @@ -568,7 +618,7 @@ class ServerWriter final : public ServerWriterInterface { void SendInitialMetadata() override { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - CallOpSet ops; + internal::CallOpSet ops; ops.SendInitialMetadata(ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { @@ -584,11 +634,12 @@ class ServerWriter final : public ServerWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the /// \a ClientContext associated with this call to fill in values). - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { if (options.is_last_message()) { options.set_buffer_hint(); } + if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { return false; } @@ -613,15 +664,21 @@ class ServerWriter final : public ServerWriterInterface { } private: - Call* const call_; + internal::Call* const call_; ServerContext* const ctx_; + + template + friend class internal::ServerStreamingHandler; + + ServerWriter(internal::Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} }; /// Server-side interface for bi-directional streaming. template -class ServerReaderWriterInterface : public ServerStreamingInterface, - public WriterInterface, - public ReaderInterface {}; +class ServerReaderWriterInterface : public internal::ServerStreamingInterface, + public internal::WriterInterface, + public internal::ReaderInterface {}; /// Actual implementation of bi-directional streaming namespace internal { @@ -688,6 +745,7 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; + } // namespace internal /// Synchronous (blocking) server-side API for a bidirectional @@ -697,8 +755,6 @@ class ServerReaderWriterBody final { template class ServerReaderWriter final : public ServerReaderWriterInterface { public: - ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {} - /// See the \a ServerStreamingInterface.SendInitialMetadata method /// for semantics. Note that initial metadata will be affected by the /// \a ServerContext associated with this call. @@ -715,13 +771,18 @@ class ServerReaderWriter final : public ServerReaderWriterInterface { /// Side effect: /// Also sends initial metadata if not already sent (using the \a /// ServerContext associated with this call). - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } private: internal::ServerReaderWriterBody body_; + + friend class internal::TemplatedBidiStreamingHandler, + false>; + ServerReaderWriter(internal::Call* call, ServerContext* ctx) + : body_(call, ctx) {} }; /// A class to represent a flow-controlled unary call. This is something @@ -736,9 +797,6 @@ template class ServerUnaryStreamer final : public ServerReaderWriterInterface { public: - ServerUnaryStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false), write_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -775,7 +833,7 @@ class ServerUnaryStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; @@ -788,6 +846,11 @@ class ServerUnaryStreamer final internal::ServerReaderWriterBody body_; bool read_done_; bool write_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerUnaryStreamer, true>; + ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false), write_done_(false) {} }; /// A class to represent a flow-controlled server-side streaming call. @@ -799,9 +862,6 @@ template class ServerSplitStreamer final : public ServerReaderWriterInterface { public: - ServerSplitStreamer(Call* call, ServerContext* ctx) - : body_(call, ctx), read_done_(false) {} - /// Block to send initial metadata to client. /// Implicit input parameter: /// - the \a ServerContext associated with this call will be used for @@ -838,7 +898,7 @@ class ServerSplitStreamer final /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - using WriterInterface::Write; + using internal::WriterInterface::Write; bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } @@ -846,6 +906,11 @@ class ServerSplitStreamer final private: internal::ServerReaderWriterBody body_; bool read_done_; + + friend class internal::TemplatedBidiStreamingHandler< + ServerSplitStreamer, false>; + ServerSplitStreamer(internal::Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h index 589deb4f03d..d464d6ea136 100644 --- a/include/grpc++/impl/codegen/time.h +++ b/include/grpc++/impl/codegen/time.h @@ -19,6 +19,8 @@ #ifndef GRPCXX_IMPL_CODEGEN_TIME_H #define GRPCXX_IMPL_CODEGEN_TIME_H +#include + #include #include @@ -59,10 +61,6 @@ class TimePoint { } // namespace grpc -#include - -#include - namespace grpc { // from and to should be absolute time. diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 0a3aae8241c..01c4a60d216 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -175,7 +175,8 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen { /// \param num_cqs How many completion queues does \a cqs hold. void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override; + void PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) override; void ShutdownInternal(gpr_timespec deadline) override; diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index a948abedb58..0888bef0d95 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -40,7 +40,6 @@ namespace grpc { class AsyncGenericService; class ResourceQuota; class CompletionQueue; -class RpcService; class Server; class ServerCompletionQueue; class ServerCredentials; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index c2db8eff714..9efd6208b0e 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file, printer->Print(vars, "namespace grpc {\n"); printer->Print(vars, "class CompletionQueue;\n"); printer->Print(vars, "class Channel;\n"); - printer->Print(vars, "class RpcService;\n"); printer->Print(vars, "class ServerCompletionQueue;\n"); printer->Print(vars, "class ServerContext;\n"); printer->Print(vars, "} // namespace grpc\n\n"); @@ -324,7 +323,8 @@ void PrintHeaderClientMethodInterfaces( } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw(" + "virtual ::grpc::ClientReaderInterface< $Response$>* " + "$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) = 0;\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; @@ -546,7 +546,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer, const grpc_generator::Method *method, std::map *vars) { (*vars)["Method"] = method->name(); - printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n"); + printer->Print(*vars, + "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n"); } void PrintHeaderServerMethodSync(grpc_generator::Printer *printer, @@ -718,7 +719,7 @@ void PrintHeaderServerMethodStreamedUnary( printer->Print(*vars, "WithStreamedUnaryMethod_$Method$() {\n" " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::StreamedUnaryHandler< $Request$, " + " new ::grpc::internal::StreamedUnaryHandler< $Request$, " "$Response$>(std::bind" "(&WithStreamedUnaryMethod_$Method$::" "Streamed$Method$, this, std::placeholders::_1, " @@ -766,15 +767,16 @@ void PrintHeaderServerMethodSplitStreaming( "{}\n"); printer->Print(" public:\n"); printer->Indent(); - printer->Print(*vars, - "WithSplitStreamingMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodStreamed($Idx$,\n" - " new ::grpc::SplitServerStreamingHandler< $Request$, " - "$Response$>(std::bind" - "(&WithSplitStreamingMethod_$Method$::" - "Streamed$Method$, this, std::placeholders::_1, " - "std::placeholders::_2)));\n" - "}\n"); + printer->Print( + *vars, + "WithSplitStreamingMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodStreamed($Idx$,\n" + " new ::grpc::internal::SplitServerStreamingHandler< $Request$, " + "$Response$>(std::bind" + "(&WithSplitStreamingMethod_$Method$::" + "Streamed$Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" + "}\n"); printer->Print(*vars, "~WithSplitStreamingMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" @@ -914,7 +916,8 @@ void PrintHeaderService(grpc_generator::Printer *printer, " {\n public:\n"); printer->Indent(); printer->Print( - "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n"); + "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& " + "channel);\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, true); } @@ -1185,10 +1188,9 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); printer->Print(*vars, - " return ::grpc::BlockingUnaryCall(channel_.get(), " - "rpcmethod_$Method$_, " - "context, request, response);\n" - "}\n\n"); + " return ::grpc::internal::BlockingUnaryCall" + "(channel_.get(), rpcmethod_$Method$_, " + "context, request, response);\n}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1198,25 +1200,27 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "ClientContext* context, " "const $Request$& request, " "::grpc::CompletionQueue* cq) {\n"); - printer->Print(*vars, - " return " - "::grpc::ClientAsyncResponseReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request, $AsyncStart$);\n" - "}\n\n"); + printer->Print( + *vars, + " return " + "::grpc::ClientAsyncResponseReader< $Response$>::internal::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$);\n" + "}\n\n"); } } else if (ClientOnlyStreaming(method)) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, $Response$* response) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientWriter< $Request$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, response);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientWriter< $Request$>::internal::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, response);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1227,12 +1231,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncWriter< $Request$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, response, $AsyncStart$$AsyncCreateArgs$);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientAsyncWriter< $Request$>::internal::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, response, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); } } else if (ServerOnlyStreaming(method)) { printer->Print( @@ -1240,12 +1245,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReader< $Response$>* " "$ns$$Service$::Stub::$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientReader< $Response$>(" - "channel_.get(), " - "rpcmethod_$Method$_, " - "context, request);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientReader< $Response$>::internal::Create(" + "channel_.get(), " + "rpcmethod_$Method$_, " + "context, request);\n" + "}\n\n"); for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1257,12 +1263,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); - printer->Print(*vars, - " return ::grpc::ClientAsyncReader< $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, request, $AsyncStart$$AsyncCreateArgs$);\n" - "}\n\n"); + printer->Print( + *vars, + " return ::grpc::ClientAsyncReader< $Response$>::internal::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, request, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); } } else if (method->BidiStreaming()) { printer->Print( @@ -1270,8 +1277,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n"); printer->Print(*vars, - " return new ::grpc::ClientReaderWriter< " - "$Request$, $Response$>(" + " return ::grpc::ClientReaderWriter< " + "$Request$, $Response$>::internal::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context);\n" @@ -1286,14 +1293,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::" "ClientContext* context, " "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); - printer->Print( - *vars, - " return " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, $AsyncStart$$AsyncCreateArgs$);\n" - "}\n\n"); + printer->Print(*vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, " + "$Response$>::internal::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, $AsyncStart$$AsyncCreateArgs$);\n" + "}\n\n"); } } } @@ -1404,7 +1411,7 @@ void PrintSourceService(grpc_generator::Printer *printer, printer->Print(*vars, ", rpcmethod_$Method$_(" "$prefix$$Service$_method_names[$Idx$], " - "::grpc::RpcMethod::$StreamingType$, " + "::grpc::internal::RpcMethod::$StreamingType$, " "channel" ")\n"); } @@ -1427,38 +1434,38 @@ void PrintSourceService(grpc_generator::Printer *printer, if (method->NoStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " + " ::grpc::internal::RpcMethod::NORMAL_RPC,\n" + " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::CLIENT_STREAMING,\n" - " new ::grpc::ClientStreamingHandler< " + " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n" + " new ::grpc::internal::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method.get())) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::SERVER_STREAMING,\n" - " new ::grpc::ServerStreamingHandler< " + " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n" + " new ::grpc::internal::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (method->BidiStreaming()) { printer->Print( *vars, - "AddMethod(new ::grpc::RpcServiceMethod(\n" + "AddMethod(new ::grpc::internal::RpcServiceMethod(\n" " $prefix$$Service$_method_names[$Idx$],\n" - " ::grpc::RpcMethod::BIDI_STREAMING,\n" - " new ::grpc::BidiStreamingHandler< " + " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n" + " new ::grpc::internal::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 19a25c838fb..4049fc846b9 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -52,7 +52,7 @@ namespace { int kConnectivityCheckIntervalMsec = 500; void WatchStateChange(void* arg); -class TagSaver final : public CompletionQueueTag { +class TagSaver final : public internal::CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} ~TagSaver() override {} @@ -259,8 +259,9 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } -Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) { +internal::Call Channel::CreateCall(const internal::RpcMethod& method, + ClientContext* context, + CompletionQueue* cq) { const bool kRegistered = method.channel_tag() && context->authority().empty(); grpc_call* c_call = NULL; if (kRegistered) { @@ -292,10 +293,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } grpc_census_call_set_context(c_call, context->census_context()); context->set_call(c_call, shared_from_this()); - return Call(c_call, this, cq); + return internal::Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 693b8bea568..452a8a7501c 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,8 +27,9 @@ std::unique_ptr CallInternal( ChannelInterface* channel, ClientContext* context, const grpc::string& method, CompletionQueue* cq, bool start, void* tag) { return std::unique_ptr( - GenericClientAsyncReaderWriter::Create( - channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), + GenericClientAsyncReaderWriter::internal::Create( + channel, cq, internal::RpcMethod(method.c_str(), + internal::RpcMethod::BIDI_STREAMING), context, start, tag)); } @@ -52,8 +53,9 @@ std::unique_ptr GenericStub::PrepareUnaryCall( ClientContext* context, const grpc::string& method, const ByteBuffer& request, CompletionQueue* cq) { return std::unique_ptr( - GenericClientAsyncResponseReader::Create( - channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC), + GenericClientAsyncResponseReader::internal::Create( + channel_.get(), cq, + internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC), context, request, false)); } diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 4a2e2be6880..eb6dc8cc5fb 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast(ev.tag); + auto cq_tag = static_cast(ev.tag); *ok = ev.success != 0; *tag = cq_tag; if (cq_tag->FinalizeResult(tag, ok)) { @@ -87,7 +87,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto cq_tag = static_cast(res_tag); + auto cq_tag = static_cast(res_tag); *ok = res == 1; if (cq_tag->FinalizeResult(tag, ok)) { return true; diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index d2cba6d6627..10dbd3c39af 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -37,11 +37,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( DefaultHealthCheckService* service) : service_(service), method_(nullptr) { - MethodHandler* handler = - new RpcMethodHandler( + internal::MethodHandler* handler = + new internal::RpcMethodHandler( std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC, - handler); + method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); AddMethod(method_); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 09d5cebe98b..99d6680c501 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { private: const DefaultHealthCheckService* const service_; - RpcServiceMethod* method_; + internal::RpcServiceMethod* method_; }; DefaultHealthCheckService(); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index d982a3d2b76..6480482774e 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -90,7 +90,8 @@ class Server::UnimplementedAsyncRequest final ServerCompletionQueue* const cq_; }; -typedef SneakyCallOpSet +typedef internal::SneakyCallOpSet UnimplementedAsyncResponseOp; class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { @@ -108,12 +109,12 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; -class ShutdownTag : public CompletionQueueTag { +class ShutdownTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } }; -class DummyTag : public CompletionQueueTag { +class DummyTag : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { *status = true; @@ -121,15 +122,15 @@ class DummyTag : public CompletionQueueTag { } }; -class Server::SyncRequest final : public CompletionQueueTag { +class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), in_flight_(false), - has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::SERVER_STREAMING), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); @@ -212,14 +213,14 @@ class Server::SyncRequest final : public CompletionQueueTag { void Run(std::shared_ptr global_callbacks) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); + method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; cq_.Shutdown(); - CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); + internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag(); cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME)); /* Ensure the cq_ is shutdown */ @@ -229,15 +230,15 @@ class Server::SyncRequest final : public CompletionQueueTag { private: CompletionQueue cq_; - Call call_; + internal::Call call_; ServerContext ctx_; const bool has_request_payload_; grpc_byte_buffer* request_payload_; - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; }; private: - RpcServiceMethod* const method_; + internal::RpcServiceMethod* const method_; void* const tag_; bool in_flight_; const bool has_request_payload_; @@ -311,14 +312,15 @@ class Server::SyncRequestThreadManager : public ThreadManager { // object } - void AddSyncMethod(RpcServiceMethod* method, void* tag) { + void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) { sync_requests_.emplace_back(new SyncRequest(method, tag)); } void AddUnknownSyncMethod() { if (!sync_requests_.empty()) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + unknown_method_.reset(new internal::RpcServiceMethod( + "unknown", internal::RpcMethod::BIDI_STREAMING, + new internal::UnknownMethodHandler)); sync_requests_.emplace_back( new SyncRequest(unknown_method_.get(), nullptr)); } @@ -355,8 +357,8 @@ class Server::SyncRequestThreadManager : public ThreadManager { CompletionQueue* server_cq_; int cq_timeout_msec_; std::vector> sync_requests_; - std::unique_ptr unknown_method_; - std::unique_ptr health_check_; + std::unique_ptr unknown_method_; + std::unique_ptr health_check_; std::shared_ptr global_callbacks_; }; @@ -439,13 +441,13 @@ std::shared_ptr Server::InProcessChannel( } static grpc_server_register_method_payload_handling PayloadHandlingForMethod( - RpcServiceMethod* method) { + internal::RpcServiceMethod* method) { switch (method->method_type()) { - case RpcMethod::NORMAL_RPC: - case RpcMethod::SERVER_STREAMING: + case internal::RpcMethod::NORMAL_RPC: + case internal::RpcMethod::SERVER_STREAMING: return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; - case RpcMethod::CLIENT_STREAMING: - case RpcMethod::BIDI_STREAMING: + case internal::RpcMethod::CLIENT_STREAMING: + case internal::RpcMethod::BIDI_STREAMING: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); @@ -466,7 +468,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { continue; } - RpcServiceMethod* method = it->get(); + internal::RpcServiceMethod* method = it->get(); void* tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); @@ -606,7 +608,8 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { +void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops, + internal::Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; @@ -622,8 +625,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { ServerInterface::BaseAsyncRequest::BaseAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, - bool delete_on_finalize) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag, bool delete_on_finalize) : server_(server), context_(context), stream_(stream), @@ -645,7 +648,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } context_->set_call(call_); context_->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_receive_message_size()); + internal::Call call(call_, server_, call_cq_, + server_->max_receive_message_size()); if (*status && call_) { context_->BeginCompletionOp(&call); } @@ -660,7 +664,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + void* tag) : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( @@ -675,7 +680,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest( ServerInterface::GenericAsyncRequest::GenericAsyncRequest( ServerInterface* server, GenericServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) : BaseAsyncRequest(server, context, stream, call_cq, tag, delete_on_finalize) { @@ -718,7 +723,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( UnimplementedAsyncRequest* request) : request_(request) { Status status(StatusCode::UNIMPLEMENTED, ""); - UnknownMethodHandler::FillOps(request_->context(), this); + internal::UnknownMethodHandler::FillOps(request_->context(), this); request_->stream()->call_.PerformOps(this); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index d7876a000b4..f2cb6363f5c 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -37,7 +37,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp final : public CallOpSetInterface { +class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -146,7 +146,7 @@ ServerContext::~ServerContext() { } } -void ServerContext::BeginCompletionOp(Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call) { GPR_ASSERT(!completion_op_); completion_op_ = new CompletionOp(); if (has_notify_when_done_tag_) { @@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) { call->PerformOps(completion_op_); } -CompletionQueueTag* ServerContext::GetCompletionOpTag() { - return static_cast(completion_op_); +internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() { + return static_cast(completion_op_); } void ServerContext::AddInitialMetadata(const grpc::string& key, diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 3d664e88254..026a94112a6 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -39,7 +39,6 @@ namespace grpc { class CompletionQueue; class Channel; -class RpcService; class ServerCompletionQueue; class ServerContext; } // namespace grpc @@ -169,10 +168,10 @@ class ServiceA final { ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodA1_; - const ::grpc::RpcMethod rpcmethod_MethodA2_; - const ::grpc::RpcMethod rpcmethod_MethodA3_; - const ::grpc::RpcMethod rpcmethod_MethodA4_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA2_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA3_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA4_; }; static std::unique_ptr NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -352,7 +351,7 @@ class ServiceA final { public: WithStreamedUnaryMethod_MethodA1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -373,7 +372,7 @@ class ServiceA final { public: WithSplitStreamingMethod_MethodA3() { ::grpc::Service::MarkMethodStreamed(2, - new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); } ~WithSplitStreamingMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -427,7 +426,7 @@ class ServiceB final { std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodB1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -484,7 +483,7 @@ class ServiceB final { public: WithStreamedUnaryMethod_MethodB1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index a0c0414f2f5..020ec0461c4 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -70,7 +70,7 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg, grpc_cq_completion* completion) {} -class DummyTag final : public CompletionQueueTag { +class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) override { return true; } }; From 7a648854e9e53f5228ad1218b559e358f72a9a38 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 27 Oct 2017 10:22:51 -0700 Subject: [PATCH 2/2] Adopt the static factory pattern (#10) * Switch sync streams from "struct internal" to static factory in namespace internal * Reduce diff size * fix friends * Use static factory pattern for async unary calls * Use static factories for async streams * clang-format --- include/grpc++/impl/codegen/async_stream.h | 147 ++++++++++-------- .../grpc++/impl/codegen/async_unary_call.h | 43 ++--- .../grpc++/impl/codegen/channel_interface.h | 24 +-- include/grpc++/impl/codegen/sync_stream.h | 70 +++++---- src/compiler/cpp_generator.cc | 24 +-- src/cpp/client/generic_stub.cc | 4 +- 6 files changed, 174 insertions(+), 138 deletions(-) diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 2012b3170b3..4476033463b 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -159,33 +159,37 @@ class ClientAsyncReaderInterface : public internal::ClientAsyncStreamingInterface, public internal::AsyncReaderInterface {}; +namespace internal { +template +class ClientAsyncReaderFactory { + public: + /// Create a stream object. + /// Write the first request out if \a start is set. + /// \a tag will be notified on \a cq when the call has been started and + /// \a request has been written out. If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template + static ClientAsyncReader* Create(ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request, + bool start, void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, start, tag); + } +}; +} // namespace internal + /// Async client-side API for doing server-streaming RPCs, /// where the incoming message stream coming from the server has /// messages of type \a R. template class ClientAsyncReader final : public ClientAsyncReaderInterface { public: - struct internal { - /// Create a stream object. - /// Write the first request out if \a start is set. - /// \a tag will be notified on \a cq when the call has been started and - /// \a request has been written out. If \a start is not set, \a tag must be - /// nullptr and the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template - static ClientAsyncReader* Create(ChannelInterface* channel, - CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, const W& request, - bool start, void* tag) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReader))) - ClientAsyncReader(call, context, request, start, tag); - } - }; - // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { assert(size == sizeof(ClientAsyncReader)); @@ -240,6 +244,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { } private: + friend class internal::ClientAsyncReaderFactory; template ClientAsyncReader(::grpc::internal::Call call, ClientContext* context, const W& request, bool start, void* tag) @@ -291,37 +296,41 @@ class ClientAsyncWriterInterface virtual void WritesDone(void* tag) = 0; }; +namespace internal { +template +class ClientAsyncWriterFactory { + public: + /// Create a stream object. + /// Start the RPC if \a start is set + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, \a tag must be nullptr and the actual call + /// must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + /// \a response will be filled in with the single expected response + /// message from the server upon a successful call to the \a Finish + /// method of this instance. + template + static ClientAsyncWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response, + bool start, void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, start, tag); + } +}; +} // namespace internal + /// Async API on the client side for doing client-streaming RPCs, /// where the outgoing message stream going to the server contains /// messages of type \a W. template class ClientAsyncWriter final : public ClientAsyncWriterInterface { public: - struct internal { - /// Create a stream object. - /// Start the RPC if \a start is set - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// If \a start is not set, \a tag must be nullptr and the actual call - /// must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - /// \a response will be filled in with the single expected response - /// message from the server upon a successful call to the \a Finish - /// method of this instance. - template - static ClientAsyncWriter* Create(ChannelInterface* channel, - CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, R* response, - bool start, void* tag) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncWriter))) - ClientAsyncWriter(call, context, response, start, tag); - } - }; - // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { assert(size == sizeof(ClientAsyncWriter)); @@ -394,6 +403,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { } private: + friend class internal::ClientAsyncWriterFactory; template ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context, R* response, bool start, void* tag) @@ -449,6 +459,30 @@ class ClientAsyncReaderWriterInterface virtual void WritesDone(void* tag) = 0; }; +namespace internal { +template +class ClientAsyncReaderWriterFactory { + public: + /// Create a stream object. + /// Start the RPC request if \a start is set. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent). If \a start is not set, \a tag must be + /// nullptr and the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + static ClientAsyncReaderWriter* Create( + ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + bool start, void* tag) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, start, tag); + } +}; +} // namespace internal + /// Async client-side interface for bi-directional streaming, /// where the outgoing message stream going to the server /// has messages of type \a W, and the incoming message stream coming @@ -457,26 +491,6 @@ template class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface { public: - struct internal { - /// Create a stream object. - /// Start the RPC request if \a start is set. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent). If \a start is not set, \a tag must be - /// nullptr and the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - static ClientAsyncReaderWriter* Create( - ChannelInterface* channel, CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, ClientContext* context, - bool start, void* tag) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncReaderWriter))) - ClientAsyncReaderWriter(call, context, start, tag); - } - }; - // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { assert(size == sizeof(ClientAsyncReaderWriter)); @@ -556,6 +570,7 @@ class ClientAsyncReaderWriter final } private: + friend class internal::ClientAsyncReaderWriterFactory; ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context, bool start, void* tag) : context_(context), call_(call), started_(start) { diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 8a3dfbc4db0..6d51c78d5cc 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -69,31 +69,35 @@ class ClientAsyncResponseReaderInterface { virtual void Finish(R* msg, Status* status, void* tag) = 0; }; +namespace internal { +template +class ClientAsyncResponseReaderFactory { + public: + /// Start a call and write the request out if \a start is set. + /// \a tag will be notified on \a cq when the call has been started (i.e. + /// intitial metadata sent) and \a request has been written out. + /// If \a start is not set, the actual call must be initiated by StartCall + /// Note that \a context will be used to fill in custom initial metadata + /// used to send to the server when starting the call. + template + static ClientAsyncResponseReader* Create( + ChannelInterface* channel, CompletionQueue* cq, + const ::grpc::internal::RpcMethod& method, ClientContext* context, + const W& request, bool start) { + ::grpc::internal::Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request, start); + } +}; +} // namespace internal + /// Async API for client-side unary RPCs, where the message response /// received from the server is of type \a R. template class ClientAsyncResponseReader final : public ClientAsyncResponseReaderInterface { public: - struct internal { - /// Start a call and write the request out if \a start is set. - /// \a tag will be notified on \a cq when the call has been started (i.e. - /// intitial metadata sent) and \a request has been written out. - /// If \a start is not set, the actual call must be initiated by StartCall - /// Note that \a context will be used to fill in custom initial metadata - /// used to send to the server when starting the call. - template - static ClientAsyncResponseReader* Create( - ChannelInterface* channel, CompletionQueue* cq, - const ::grpc::internal::RpcMethod& method, ClientContext* context, - const W& request, bool start) { - ::grpc::internal::Call call = channel->CreateCall(method, context, cq); - return new (g_core_codegen_interface->grpc_call_arena_alloc( - call.call(), sizeof(ClientAsyncResponseReader))) - ClientAsyncResponseReader(call, context, request, start); - } - }; - // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { assert(size == sizeof(ClientAsyncResponseReader)); @@ -138,6 +142,7 @@ class ClientAsyncResponseReader final } private: + friend class internal::ClientAsyncResponseReaderFactory; ClientContext* const context_; ::grpc::internal::Call call_; bool started_; diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h index 41c213f64a9..769f8539747 100644 --- a/include/grpc++/impl/codegen/channel_interface.h +++ b/include/grpc++/impl/codegen/channel_interface.h @@ -34,14 +34,6 @@ template class ClientWriter; template class ClientReaderWriter; -template -class ClientAsyncReader; -template -class ClientAsyncWriter; -template -class ClientAsyncReaderWriter; -template -class ClientAsyncResponseReader; namespace internal { class Call; @@ -49,6 +41,14 @@ class CallOpSetInterface; class RpcMethod; template class BlockingUnaryCallImpl; +template +class ClientAsyncReaderFactory; +template +class ClientAsyncWriterFactory; +template +class ClientAsyncReaderWriterFactory; +template +class ClientAsyncResponseReaderFactory; } // namespace internal /// Codegen interface for \a grpc::Channel. @@ -94,13 +94,13 @@ class ChannelInterface { template friend class ::grpc::ClientReaderWriter; template - friend class ::grpc::ClientAsyncReader; + friend class ::grpc::internal::ClientAsyncReaderFactory; template - friend class ::grpc::ClientAsyncWriter; + friend class ::grpc::internal::ClientAsyncWriterFactory; template - friend class ::grpc::ClientAsyncReaderWriter; + friend class ::grpc::internal::ClientAsyncReaderWriterFactory; template - friend class ::grpc::ClientAsyncResponseReader; + friend class ::grpc::internal::ClientAsyncResponseReaderFactory; template friend class ::grpc::internal::BlockingUnaryCallImpl; friend class ::grpc::internal::RpcMethod; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 49ace0beeb6..a6dd26fb008 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -156,21 +156,25 @@ class ClientReaderInterface : public internal::ClientStreamingInterface, virtual void WaitForInitialMetadata() = 0; }; +namespace internal { +template +class ClientReaderFactory { + public: + template + static ClientReader* Create(ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, const W& request) { + return new ClientReader(channel, method, context, request); + } +}; +} // namespace internal + /// Synchronous (blocking) client-side API for doing server-streaming RPCs, /// where the stream of messages coming from the server has messages /// of type \a R. template class ClientReader final : public ClientReaderInterface { public: - struct internal { - template - static ClientReader* Create(ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, const W& request) { - return new ClientReader(channel, method, context, request); - } - }; - /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. /// @@ -225,6 +229,7 @@ class ClientReader final : public ClientReaderInterface { } private: + friend class internal::ClientReaderFactory; ClientContext* context_; CompletionQueue cq_; ::grpc::internal::Call call_; @@ -269,21 +274,25 @@ class ClientWriterInterface : public internal::ClientStreamingInterface, virtual bool WritesDone() = 0; }; +namespace internal { +template +class ClientWriterFactory { + public: + template + static ClientWriter* Create(::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, + ClientContext* context, R* response) { + return new ClientWriter(channel, method, context, response); + } +}; +} // namespace internal + /// Synchronous (blocking) client-side API for doing client-streaming RPCs, /// where the outgoing message stream coming from the client has messages of /// type \a W. template class ClientWriter : public ClientWriterInterface { public: - struct internal { - template - static ClientWriter* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context, R* response) { - return new ClientWriter(channel, method, context, response); - } - }; - /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for /// semantics. /// @@ -355,12 +364,13 @@ class ClientWriter : public ClientWriterInterface { } private: + friend class internal::ClientWriterFactory; + /// Block to create a stream (i.e. send request headers and other initial /// metadata to the server). Note that \a context will be used to fill /// in custom initial metadata. \a response will be filled in with the /// single expected response message from the server upon a successful /// call to the \a Finish method of this instance. - template ClientWriter(ChannelInterface* channel, const ::grpc::internal::RpcMethod& method, @@ -415,6 +425,18 @@ class ClientReaderWriterInterface : public internal::ClientStreamingInterface, virtual bool WritesDone() = 0; }; +namespace internal { +template +class ClientReaderWriterFactory { + public: + static ClientReaderWriter* Create( + ::grpc::ChannelInterface* channel, + const ::grpc::internal::RpcMethod& method, ClientContext* context) { + return new ClientReaderWriter(channel, method, context); + } +}; +} // namespace internal + /// Synchronous (blocking) client-side API for bi-directional streaming RPCs, /// where the outgoing message stream coming from the client has messages of /// type \a W, and the incoming messages stream coming from the server has @@ -422,14 +444,6 @@ class ClientReaderWriterInterface : public internal::ClientStreamingInterface, template class ClientReaderWriter final : public ClientReaderWriterInterface { public: - struct internal { - static ClientReaderWriter* Create(::grpc::ChannelInterface* channel, - const ::grpc::internal::RpcMethod& method, - ClientContext* context) { - return new ClientReaderWriter(channel, method, context); - } - }; - /// Block waiting to read initial metadata from the server. /// This call is optional, but if it is used, it cannot be used concurrently /// with or after the \a Finish method. @@ -523,6 +537,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { } private: + friend class internal::ClientReaderWriterFactory; + ClientContext* context_; CompletionQueue cq_; ::grpc::internal::Call call_; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 9efd6208b0e..253280bd244 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -1203,8 +1203,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, printer->Print( *vars, " return " - "::grpc::ClientAsyncResponseReader< $Response$>::internal::Create(" - "channel_.get(), cq, " + "::grpc::internal::ClientAsyncResponseReaderFactory< $Response$>" + "::Create(channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request, $AsyncStart$);\n" "}\n\n"); @@ -1216,7 +1216,7 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Print( *vars, - " return ::grpc::ClientWriter< $Request$>::internal::Create(" + " return ::grpc::internal::ClientWriterFactory< $Request$>::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context, response);\n" @@ -1233,8 +1233,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); printer->Print( *vars, - " return ::grpc::ClientAsyncWriter< $Request$>::internal::Create(" - "channel_.get(), cq, " + " return ::grpc::internal::ClientAsyncWriterFactory< $Request$>" + "::Create(channel_.get(), cq, " "rpcmethod_$Method$_, " "context, response, $AsyncStart$$AsyncCreateArgs$);\n" "}\n\n"); @@ -1247,7 +1247,7 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, const $Request$& request) {\n"); printer->Print( *vars, - " return ::grpc::ClientReader< $Response$>::internal::Create(" + " return ::grpc::internal::ClientReaderFactory< $Response$>::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context, request);\n" @@ -1265,8 +1265,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); printer->Print( *vars, - " return ::grpc::ClientAsyncReader< $Response$>::internal::Create(" - "channel_.get(), cq, " + " return ::grpc::internal::ClientAsyncReaderFactory< $Response$>" + "::Create(channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request, $AsyncStart$$AsyncCreateArgs$);\n" "}\n\n"); @@ -1277,8 +1277,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n"); printer->Print(*vars, - " return ::grpc::ClientReaderWriter< " - "$Request$, $Response$>::internal::Create(" + " return ::grpc::internal::ClientReaderWriterFactory< " + "$Request$, $Response$>::Create(" "channel_.get(), " "rpcmethod_$Method$_, " "context);\n" @@ -1295,8 +1295,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n"); printer->Print(*vars, " return " - "::grpc::ClientAsyncReaderWriter< $Request$, " - "$Response$>::internal::Create(" + "::grpc::internal::ClientAsyncReaderWriterFactory< " + "$Request$, $Response$>::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, $AsyncStart$$AsyncCreateArgs$);\n" diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 452a8a7501c..fc18ce90935 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -27,7 +27,7 @@ std::unique_ptr CallInternal( ChannelInterface* channel, ClientContext* context, const grpc::string& method, CompletionQueue* cq, bool start, void* tag) { return std::unique_ptr( - GenericClientAsyncReaderWriter::internal::Create( + internal::ClientAsyncReaderWriterFactory::Create( channel, cq, internal::RpcMethod(method.c_str(), internal::RpcMethod::BIDI_STREAMING), context, start, tag)); @@ -53,7 +53,7 @@ std::unique_ptr GenericStub::PrepareUnaryCall( ClientContext* context, const grpc::string& method, const ByteBuffer& request, CompletionQueue* cq) { return std::unique_ptr( - GenericClientAsyncResponseReader::internal::Create( + internal::ClientAsyncResponseReaderFactory::Create( channel_.get(), cq, internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC), context, request, false));