From 891b6cf7aea6e0638c80f6d23af7f53e2cd59c9b Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 10 Apr 2019 10:22:59 -0700 Subject: [PATCH] Revert "Folding CompletionQueue and ServerCompletionQueue." --- BUILD | 1 - BUILD.gn | 1 - CMakeLists.txt | 5 - Makefile | 5 - build.yaml | 1 - gRPC-C++.podspec | 1 - include/grpcpp/channel_impl.h | 13 +- include/grpcpp/create_channel_impl.h | 6 +- include/grpcpp/generic/generic_stub_impl.h | 2 +- include/grpcpp/impl/codegen/async_stream.h | 2 + .../grpcpp/impl/codegen/async_unary_call.h | 1 + include/grpcpp/impl/codegen/call.h | 14 +- include/grpcpp/impl/codegen/call_op_set.h | 1 + .../grpcpp/impl/codegen/channel_interface.h | 16 +- include/grpcpp/impl/codegen/client_callback.h | 1 + include/grpcpp/impl/codegen/client_context.h | 1 + .../grpcpp/impl/codegen/client_unary_call.h | 2 + .../grpcpp/impl/codegen/completion_queue.h | 379 +++++++++++++++- .../impl/codegen/completion_queue_impl.h | 422 ------------------ .../grpcpp/impl/codegen/intercepted_channel.h | 13 +- include/grpcpp/impl/codegen/server_context.h | 6 +- .../grpcpp/impl/codegen/server_interface.h | 72 ++- include/grpcpp/impl/codegen/service_type.h | 2 + include/grpcpp/server_builder.h | 7 + include/grpcpp/server_builder_impl.h | 4 +- src/compiler/cpp_generator.cc | 4 +- src/cpp/common/completion_queue_cc.cc | 12 +- test/cpp/codegen/compiler_test_golden | 4 +- tools/doxygen/Doxyfile.c++ | 1 - tools/doxygen/Doxyfile.c++.internal | 1 - .../generated/sources_and_headers.json | 2 - 31 files changed, 466 insertions(+), 536 deletions(-) delete mode 100644 include/grpcpp/impl/codegen/completion_queue_impl.h diff --git a/BUILD b/BUILD index ec9f58dbed1..74cf5413ba1 100644 --- a/BUILD +++ b/BUILD @@ -2130,7 +2130,6 @@ grpc_cc_library( "include/grpcpp/impl/codegen/client_interceptor.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", - "include/grpcpp/impl/codegen/completion_queue_impl.h", "include/grpcpp/impl/codegen/completion_queue_tag.h", "include/grpcpp/impl/codegen/config.h", "include/grpcpp/impl/codegen/core_codegen_interface.h", diff --git a/BUILD.gn b/BUILD.gn index 78e434477b8..081de0cbfeb 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -1035,7 +1035,6 @@ config("grpc_config") { "include/grpcpp/impl/codegen/client_interceptor.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", - "include/grpcpp/impl/codegen/completion_queue_impl.h", "include/grpcpp/impl/codegen/completion_queue_tag.h", "include/grpcpp/impl/codegen/config.h", "include/grpcpp/impl/codegen/config_protobuf.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 6d5e448d688..1a6aa0307bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3157,7 +3157,6 @@ foreach(_hdr include/grpcpp/impl/codegen/client_interceptor.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h - include/grpcpp/impl/codegen/completion_queue_impl.h include/grpcpp/impl/codegen/completion_queue_tag.h include/grpcpp/impl/codegen/config.h include/grpcpp/impl/codegen/core_codegen_interface.h @@ -3763,7 +3762,6 @@ foreach(_hdr include/grpcpp/impl/codegen/client_interceptor.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h - include/grpcpp/impl/codegen/completion_queue_impl.h include/grpcpp/impl/codegen/completion_queue_tag.h include/grpcpp/impl/codegen/config.h include/grpcpp/impl/codegen/core_codegen_interface.h @@ -4197,7 +4195,6 @@ foreach(_hdr include/grpcpp/impl/codegen/client_interceptor.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h - include/grpcpp/impl/codegen/completion_queue_impl.h include/grpcpp/impl/codegen/completion_queue_tag.h include/grpcpp/impl/codegen/config.h include/grpcpp/impl/codegen/core_codegen_interface.h @@ -4395,7 +4392,6 @@ foreach(_hdr include/grpcpp/impl/codegen/client_interceptor.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h - include/grpcpp/impl/codegen/completion_queue_impl.h include/grpcpp/impl/codegen/completion_queue_tag.h include/grpcpp/impl/codegen/config.h include/grpcpp/impl/codegen/core_codegen_interface.h @@ -4747,7 +4743,6 @@ foreach(_hdr include/grpcpp/impl/codegen/client_interceptor.h include/grpcpp/impl/codegen/client_unary_call.h include/grpcpp/impl/codegen/completion_queue.h - include/grpcpp/impl/codegen/completion_queue_impl.h include/grpcpp/impl/codegen/completion_queue_tag.h include/grpcpp/impl/codegen/config.h include/grpcpp/impl/codegen/core_codegen_interface.h diff --git a/Makefile b/Makefile index c3c7412590d..4cbcc0be757 100644 --- a/Makefile +++ b/Makefile @@ -5492,7 +5492,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ - include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/core_codegen_interface.h \ @@ -6106,7 +6105,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ - include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/core_codegen_interface.h \ @@ -6512,7 +6510,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ - include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/core_codegen_interface.h \ @@ -6681,7 +6678,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ - include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/core_codegen_interface.h \ @@ -7039,7 +7035,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ - include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/core_codegen_interface.h \ diff --git a/build.yaml b/build.yaml index 8752447f125..51422e22adb 100644 --- a/build.yaml +++ b/build.yaml @@ -1248,7 +1248,6 @@ filegroups: - include/grpcpp/impl/codegen/client_interceptor.h - include/grpcpp/impl/codegen/client_unary_call.h - include/grpcpp/impl/codegen/completion_queue.h - - include/grpcpp/impl/codegen/completion_queue_impl.h - include/grpcpp/impl/codegen/completion_queue_tag.h - include/grpcpp/impl/codegen/config.h - include/grpcpp/impl/codegen/core_codegen_interface.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 7c90454f1a9..e9a41c9e758 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -160,7 +160,6 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/client_interceptor.h', 'include/grpcpp/impl/codegen/client_unary_call.h', 'include/grpcpp/impl/codegen/completion_queue.h', - 'include/grpcpp/impl/codegen/completion_queue_impl.h', 'include/grpcpp/impl/codegen/completion_queue_tag.h', 'include/grpcpp/impl/codegen/config.h', 'include/grpcpp/impl/codegen/core_codegen_interface.h', diff --git a/include/grpcpp/channel_impl.h b/include/grpcpp/channel_impl.h index 19422d8d30b..26850f8d30f 100644 --- a/include/grpcpp/channel_impl.h +++ b/include/grpcpp/channel_impl.h @@ -34,7 +34,6 @@ struct grpc_channel; namespace grpc_impl { -class CompletionQueue; namespace experimental { /// Resets the channel's connection backoff. /// TODO(roth): Once we see whether this proves useful, either create a gRFC @@ -78,22 +77,22 @@ class Channel final : public ::grpc::ChannelInterface, ::grpc::internal::Call CreateCall(const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, - CompletionQueue* cq) override; + ::grpc::CompletionQueue* cq) override; void PerformOpsOnCall(::grpc::internal::CallOpSetInterface* ops, ::grpc::internal::Call* call) override; void* RegisterMethod(const char* method) override; void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline, CompletionQueue* cq, - void* tag) override; + gpr_timespec deadline, + ::grpc::CompletionQueue* cq, void* tag) override; bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) override; - CompletionQueue* CallbackCQ() override; + ::grpc::CompletionQueue* CallbackCQ() override; ::grpc::internal::Call CreateCallInternal( const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, - CompletionQueue* cq, size_t interceptor_pos) override; + ::grpc::CompletionQueue* cq, size_t interceptor_pos) override; const grpc::string host_; grpc_channel* const c_channel_; // owned @@ -105,7 +104,7 @@ class Channel final : public ::grpc::ChannelInterface, // with this channel (if any). It is set on the first call to CallbackCQ(). // It is _not owned_ by the channel; ownership belongs with its internal // shutdown callback tag (invoked when the CQ is fully shutdown). - CompletionQueue* callback_cq_ = nullptr; + ::grpc::CompletionQueue* callback_cq_ = nullptr; std::vector< std::unique_ptr<::grpc::experimental::ClientInterceptorFactoryInterface>> diff --git a/include/grpcpp/create_channel_impl.h b/include/grpcpp/create_channel_impl.h index 84b6c0e643d..214a537a3cb 100644 --- a/include/grpcpp/create_channel_impl.h +++ b/include/grpcpp/create_channel_impl.h @@ -35,7 +35,7 @@ namespace grpc_impl { /// \param creds Credentials to use for the created channel. If it does not /// hold an object or is invalid, a lame channel (one on which all operations /// fail) is returned. -std::shared_ptr CreateChannel( +std::shared_ptr CreateChannel( const grpc::string& target, const std::shared_ptr& creds); @@ -49,7 +49,7 @@ std::shared_ptr CreateChannel( /// hold an object or is invalid, a lame channel (one on which all operations /// fail) is returned. /// \param args Options for channel creation. -std::shared_ptr CreateCustomChannel( +std::shared_ptr CreateCustomChannel( const grpc::string& target, const std::shared_ptr& creds, const grpc::ChannelArguments& args); @@ -66,7 +66,7 @@ namespace experimental { /// hold an object or is invalid, a lame channel (one on which all operations /// fail) is returned. /// \param args Options for channel creation. -std::shared_ptr CreateCustomChannelWithInterceptors( +std::shared_ptr CreateCustomChannelWithInterceptors( const grpc::string& target, const std::shared_ptr& creds, const grpc::ChannelArguments& args, diff --git a/include/grpcpp/generic/generic_stub_impl.h b/include/grpcpp/generic/generic_stub_impl.h index ff1223453c1..2d6a11de923 100644 --- a/include/grpcpp/generic/generic_stub_impl.h +++ b/include/grpcpp/generic/generic_stub_impl.h @@ -29,12 +29,12 @@ namespace grpc { +class CompletionQueue; typedef ClientAsyncReaderWriter GenericClientAsyncReaderWriter; typedef ClientAsyncResponseReader GenericClientAsyncResponseReader; } // namespace grpc namespace grpc_impl { -class CompletionQueue; /// Generic stubs provide a type-unsafe interface to call gRPC methods /// by name. diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index 6b8a9df7b29..e12923ea02c 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -28,6 +28,8 @@ namespace grpc { +class CompletionQueue; + namespace internal { /// Common interface for all client side asynchronous streaming. class ClientAsyncStreamingInterface { diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 4b97cf29018..89dcb124189 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -29,6 +29,7 @@ namespace grpc { +class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; /// An interface relevant for async client side unary RPCs (which send diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index eefa4a7f9cd..c040c30dd9d 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -21,11 +21,9 @@ #include #include -namespace grpc_impl { +namespace grpc { class CompletionQueue; -} -namespace grpc { namespace experimental { class ClientRpcInfo; class ServerRpcInfo; @@ -43,13 +41,13 @@ class Call final { call_(nullptr), max_receive_message_size_(-1) {} /** call is owned by the caller */ - Call(grpc_call* call, CallHook* call_hook, ::grpc_impl::CompletionQueue* cq) + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) : call_hook_(call_hook), cq_(cq), call_(call), max_receive_message_size_(-1) {} - Call(grpc_call* call, CallHook* call_hook, ::grpc_impl::CompletionQueue* cq, + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, experimental::ClientRpcInfo* rpc_info) : call_hook_(call_hook), cq_(cq), @@ -57,7 +55,7 @@ class Call final { max_receive_message_size_(-1), client_rpc_info_(rpc_info) {} - Call(grpc_call* call, CallHook* call_hook, ::grpc_impl::CompletionQueue* cq, + Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, int max_receive_message_size, experimental::ServerRpcInfo* rpc_info) : call_hook_(call_hook), cq_(cq), @@ -70,7 +68,7 @@ class Call final { } grpc_call* call() const { return call_; } - ::grpc_impl::CompletionQueue* cq() const { return cq_; } + CompletionQueue* cq() const { return cq_; } int max_receive_message_size() const { return max_receive_message_size_; } @@ -84,7 +82,7 @@ class Call final { private: CallHook* call_hook_; - ::grpc_impl::CompletionQueue* cq_; + CompletionQueue* cq_; grpc_call* call_; int max_receive_message_size_; experimental::ClientRpcInfo* client_rpc_info_ = nullptr; diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index d810625b3e4..4ca87a99fca 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -48,6 +48,7 @@ namespace grpc { +class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; namespace internal { diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index faa5e41cef9..5353f5feaa6 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -24,13 +24,10 @@ #include #include -namespace grpc_impl { -class CompletionQueue; -} - namespace grpc { class ChannelInterface; class ClientContext; +class CompletionQueue; template class ClientReader; @@ -76,7 +73,7 @@ class ChannelInterface { /// deadline expires. \a GetState needs to called to get the current state. template void NotifyOnStateChange(grpc_connectivity_state last_observed, T deadline, - ::grpc_impl::CompletionQueue* cq, void* tag) { + CompletionQueue* cq, void* tag) { TimePoint deadline_tp(deadline); NotifyOnStateChangeImpl(last_observed, deadline_tp.raw_time(), cq, tag); } @@ -128,14 +125,13 @@ class ChannelInterface { friend class ::grpc::internal::InterceptedChannel; virtual internal::Call CreateCall(const internal::RpcMethod& method, ClientContext* context, - ::grpc_impl::CompletionQueue* cq) = 0; + CompletionQueue* cq) = 0; virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, internal::Call* call) = 0; virtual void* RegisterMethod(const char* method) = 0; virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline, - ::grpc_impl::CompletionQueue* cq, - void* tag) = 0; + CompletionQueue* cq, void* tag) = 0; virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, gpr_timespec deadline) = 0; @@ -148,7 +144,7 @@ class ChannelInterface { // change (even though this is private and non-API) virtual internal::Call CreateCallInternal(const internal::RpcMethod& method, ClientContext* context, - ::grpc_impl::CompletionQueue* cq, + CompletionQueue* cq, size_t interceptor_pos) { return internal::Call(); } @@ -161,7 +157,7 @@ class ChannelInterface { // Returns nullptr (rather than being pure) since this is a post-1.0 method // and adding a new pure method to an interface would be a breaking change // (even though this is private and non-API) - virtual ::grpc_impl::CompletionQueue* CallbackCQ() { return nullptr; } + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index a00d49ef30c..836200b5915 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -36,6 +36,7 @@ class Channel; namespace grpc { class ClientContext; +class CompletionQueue; namespace internal { class RpcMethod; diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 19e96d3589f..3b74a1367d1 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -65,6 +65,7 @@ class Channel; namespace grpc { class ChannelInterface; +class CompletionQueue; class CallCredentials; class ClientContext; diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h index e0f692b1783..b9f8e1663f1 100644 --- a/include/grpcpp/impl/codegen/client_unary_call.h +++ b/include/grpcpp/impl/codegen/client_unary_call.h @@ -27,7 +27,9 @@ namespace grpc { +class Channel; class ClientContext; +class CompletionQueue; namespace internal { class RpcMethod; diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index edd1c2e97c8..508ab05815d 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -32,12 +32,385 @@ #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H -#include +#include +#include +#include +#include +#include +#include +struct grpc_completion_queue; + +namespace grpc_impl { + +class Channel; +class Server; +class ServerBuilder; +} // namespace grpc_impl namespace grpc { -typedef ::grpc_impl::CompletionQueue CompletionQueue; -typedef ::grpc_impl::ServerCompletionQueue ServerCompletionQueue; +template +class ClientReader; +template +class ClientWriter; +template +class ClientReaderWriter; +template +class ServerReader; +template +class ServerWriter; +namespace internal { +template +class ServerReaderWriterBody; +} // namespace internal + +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class ServerContext; +class ServerInterface; + +namespace internal { +class CompletionQueueTag; +class RpcMethod; +template +class RpcMethodHandler; +template +class ClientStreamingHandler; +template +class ServerStreamingHandler; +template +class BidiStreamingHandler; +template +class TemplatedBidiStreamingHandler; +template +class ErrorMethodHandler; +template +class BlockingUnaryCallImpl; +template +class CallOpSet; +} // namespace internal + +extern CoreCodegenInterface* g_core_codegen_interface; + +/// A thin wrapper around \ref grpc_completion_queue (see \ref +/// src/core/lib/surface/completion_queue.h). +/// See \ref doc/cpp/perf_notes.md for notes on best practices for high +/// performance servers. +class CompletionQueue : private GrpcLibraryCodegen { + public: + /// Default constructor. Implicitly creates a \a grpc_completion_queue + /// instance. + CompletionQueue() + : CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, + nullptr}) {} + + /// Wrap \a take, taking ownership of the instance. + /// + /// \param take The completion queue instance to wrap. Ownership is taken. + explicit CompletionQueue(grpc_completion_queue* take); + + /// Destructor. Destroys the owned wrapped completion queue / instance. + ~CompletionQueue() { + g_core_codegen_interface->grpc_completion_queue_destroy(cq_); + } + + /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. + enum NextStatus { + SHUTDOWN, ///< The completion queue has been shutdown and fully-drained + GOT_EVENT, ///< Got a new event; \a tag will be filled in with its + ///< associated value; \a ok indicating its success. + TIMEOUT ///< deadline was reached. + }; + + /// Read from the queue, blocking until an event is available or the queue is + /// shutting down. + /// + /// \param tag [out] Updated to point to the read event's tag. + /// \param ok [out] true if read a successful event, false otherwise. + /// + /// Note that each tag sent to the completion queue (through RPC operations + /// or alarms) will be delivered out of the completion queue by a call to + /// Next (or a related method), regardless of whether the operation succeeded + /// or not. Success here means that this operation completed in the normal + /// valid manner. + /// + /// Server-side RPC request: \a ok indicates that the RPC has indeed + /// been started. If it is false, the server has been Shutdown + /// before this particular call got matched to an incoming RPC. + /// + /// Client-side StartCall/RPC invocation: \a ok indicates that the RPC is + /// going to go to the wire. If it is false, it not going to the wire. This + /// would happen if the channel is either permanently broken or + /// transiently broken but with the fail-fast option. (Note that async unary + /// RPCs don't post a CQ tag at this point, nor do client-streaming + /// or bidi-streaming RPCs that have the initial metadata corked option set.) + /// + /// Client-side Write, Client-side WritesDone, Server-side Write, + /// Server-side Finish, Server-side SendInitialMetadata (which is + /// typically included in Write or Finish when not done explicitly): + /// \a ok means that the data/metadata/status/etc is going to go to the + /// wire. If it is false, it not going to the wire because the call + /// is already dead (i.e., canceled, deadline expired, other side + /// dropped the channel, etc). + /// + /// Client-side Read, Server-side Read, Client-side + /// RecvInitialMetadata (which is typically included in Read if not + /// done explicitly): \a ok indicates whether there is a valid message + /// that got read. If not, you know that there are certainly no more + /// messages that can ever be read from this stream. For the client-side + /// operations, this only happens because the call is dead. For the + /// server-sider operation, though, this could happen because the client + /// has done a WritesDone already. + /// + /// Client-side Finish: \a ok should always be true + /// + /// Server-side AsyncNotifyWhenDone: \a ok should always be true + /// + /// Alarm: \a ok is true if it expired, false if it was canceled + /// + /// \return true if got an event, false if the queue is fully drained and + /// shut down. + bool Next(void** tag, bool* ok) { + return (AsyncNextInternal(tag, ok, + g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) != SHUTDOWN); + } + + /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param tag [out] Upon sucess, updated to point to the event's tag. + /// \param ok [out] Upon sucess, true if a successful event, false otherwise + /// See documentation for CompletionQueue::Next for explanation of ok + /// \param deadline [in] How long to block in wait for an event. + /// + /// \return The type of event read. + template + NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { + TimePoint deadline_tp(deadline); + return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); + } + + /// EXPERIMENTAL + /// First executes \a F, then reads from the queue, blocking up to + /// \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param f [in] Function to execute before calling AsyncNext on this queue. + /// \param tag [out] Upon sucess, updated to point to the event's tag. + /// \param ok [out] Upon sucess, true if read a regular event, false + /// otherwise. + /// \param deadline [in] How long to block in wait for an event. + /// + /// \return The type of event read. + template + NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) { + CompletionQueueTLSCache cache = CompletionQueueTLSCache(this); + f(); + if (cache.Flush(tag, ok)) { + return GOT_EVENT; + } else { + return AsyncNext(tag, ok, deadline); + } + } + + /// Request the shutdown of the queue. + /// + /// \warning This method must be called at some point if this completion queue + /// is accessed with Next or AsyncNext. \a Next will not return false + /// until this method has been called and all pending tags have been drained. + /// (Likewise for \a AsyncNext returning \a NextStatus::SHUTDOWN .) + /// Only once either one of these methods does that (that is, once the queue + /// has been \em drained) can an instance of this class be destroyed. + /// Also note that applications must ensure that no work is enqueued on this + /// completion queue after this method is called. + void Shutdown(); + + /// Returns a \em raw pointer to the underlying \a grpc_completion_queue + /// instance. + /// + /// \warning Remember that the returned instance is owned. No transfer of + /// owership is performed. + grpc_completion_queue* cq() { return cq_; } + + protected: + /// Private constructor of CompletionQueue only visible to friend classes + CompletionQueue(const grpc_completion_queue_attributes& attributes) { + cq_ = g_core_codegen_interface->grpc_completion_queue_create( + g_core_codegen_interface->grpc_completion_queue_factory_lookup( + &attributes), + &attributes, NULL); + InitialAvalanching(); // reserve this for the future shutdown + } + + private: + // Friend synchronous wrappers so that they can access Pluck(), which is + // a semi-private API geared towards the synchronous implementation. + template + friend class ::grpc::ClientReader; + template + friend class ::grpc::ClientWriter; + template + friend class ::grpc::ClientReaderWriter; + template + friend class ::grpc::ServerReader; + template + friend class ::grpc::ServerWriter; + template + friend class ::grpc::internal::ServerReaderWriterBody; + template + friend class ::grpc::internal::RpcMethodHandler; + template + friend class ::grpc::internal::ClientStreamingHandler; + template + friend class ::grpc::internal::ServerStreamingHandler; + template + friend class ::grpc::internal::TemplatedBidiStreamingHandler; + template + friend class ::grpc::internal::ErrorMethodHandler; + friend class ::grpc_impl::Server; + friend class ::grpc::ServerContext; + friend class ::grpc::ServerInterface; + template + friend class ::grpc::internal::BlockingUnaryCallImpl; + + // Friends that need access to constructor for callback CQ + friend class ::grpc_impl::Channel; + + // For access to Register/CompleteAvalanching + template + friend class ::grpc::internal::CallOpSet; + + /// EXPERIMENTAL + /// Creates a Thread Local cache to store the first event + /// On this completion queue queued from this thread. Once + /// initialized, it must be flushed on the same thread. + class CompletionQueueTLSCache { + public: + CompletionQueueTLSCache(CompletionQueue* cq); + ~CompletionQueueTLSCache(); + bool Flush(void** tag, bool* ok); + + private: + CompletionQueue* cq_; + bool flushed_; + }; + + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); + + /// Wraps \a grpc_completion_queue_pluck. + /// \warning Must not be mixed with calls to \a Next. + bool Pluck(internal::CompletionQueueTag* tag) { + auto deadline = + g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); + while (true) { + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + bool ok = ev.success != 0; + void* ignored = tag; + if (tag->FinalizeResult(&ignored, &ok)) { + GPR_CODEGEN_ASSERT(ignored == tag); + return ok; + } + } + } + + /// Performs a single polling pluck on \a tag. + /// \warning Must not be mixed with calls to \a Next. + /// + /// TODO: sreek - This calls tag->FinalizeResult() even if the cq_ is already + /// shutdown. This is most likely a bug and if it is a bug, then change this + /// implementation to simple call the other TryPluck function with a zero + /// timeout. i.e: + /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) + void TryPluck(internal::CompletionQueueTag* tag) { + auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT) return; + bool ok = ev.success != 0; + void* ignored = tag; + // the tag must be swallowed if using TryPluck + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } + + /// Performs a single polling pluck on \a tag. Calls tag->FinalizeResult if + /// the pluck() was successful and returned the tag. + /// + /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects + /// that the tag is internal not something that is returned to the user. + void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) { + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { + return; + } + + bool ok = ev.success != 0; + void* ignored = tag; + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } + + /// Manage state of avalanching operations : completion queue tags that + /// trigger other completion queue operations. The underlying core completion + /// queue should not really shutdown until all avalanching operations have + /// been finalized. Note that we maintain the requirement that an avalanche + /// registration must take place before CQ shutdown (which must be maintained + /// elsehwere) + void InitialAvalanching() { + gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); + } + void RegisterAvalanching() { + gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(1)); + } + void CompleteAvalanching() { + if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(-1)) == 1) { + g_core_codegen_interface->grpc_completion_queue_shutdown(cq_); + } + } + + grpc_completion_queue* cq_; // owned + + gpr_atm avalanches_in_flight_; +}; + +/// A specific type of completion queue used by the processing of notifications +/// by servers. Instantiated by \a ServerBuilder. +class ServerCompletionQueue : public CompletionQueue { + public: + bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } + + protected: + /// Default constructor + ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} + + private: + /// \param completion_type indicates whether this is a NEXT or CALLBACK + /// completion queue. + /// \param polling_type Informs the GRPC library about the type of polling + /// allowed on this completion queue. See grpc_cq_polling_type's description + /// in grpc_types.h for more details. + /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues + ServerCompletionQueue(grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, + grpc_experimental_completion_queue_functor* shutdown_cb) + : CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, completion_type, polling_type, + shutdown_cb}), + polling_type_(polling_type) {} + + grpc_cq_polling_type polling_type_; + friend class grpc_impl::ServerBuilder; + friend class grpc_impl::Server; +}; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/completion_queue_impl.h b/include/grpcpp/impl/codegen/completion_queue_impl.h deleted file mode 100644 index 5435f2fa165..00000000000 --- a/include/grpcpp/impl/codegen/completion_queue_impl.h +++ /dev/null @@ -1,422 +0,0 @@ -/* - * - * Copyright 2015-2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/// A completion queue implements a concurrent producer-consumer queue, with -/// two main API-exposed methods: \a Next and \a AsyncNext. These -/// methods are the essential component of the gRPC C++ asynchronous API. -/// There is also a \a Shutdown method to indicate that a given completion queue -/// will no longer have regular events. This must be called before the -/// completion queue is destroyed. -/// All completion queue APIs are thread-safe and may be used concurrently with -/// any other completion queue API invocation; it is acceptable to have -/// multiple threads calling \a Next or \a AsyncNext on the same or different -/// completion queues, or to call these methods concurrently with a \a Shutdown -/// elsewhere. -/// \remark{All other API calls on completion queue should be completed before -/// a completion queue destructor is called.} -#ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H -#define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H - -#include -#include -#include -#include -#include -#include - -struct grpc_completion_queue; - -namespace grpc_impl { - -class Channel; -class Server; -class ServerBuilder; -} // namespace grpc_impl -namespace grpc { - -template -class ClientReader; -template -class ClientWriter; -template -class ClientReaderWriter; -template -class ServerReader; -template -class ServerWriter; -namespace internal { -template -class ServerReaderWriterBody; -} // namespace internal - -class ChannelInterface; -class ClientContext; -class ServerContext; -class ServerInterface; - -namespace internal { -class CompletionQueueTag; -class RpcMethod; -template -class RpcMethodHandler; -template -class ClientStreamingHandler; -template -class ServerStreamingHandler; -template -class BidiStreamingHandler; -template -class TemplatedBidiStreamingHandler; -template -class ErrorMethodHandler; -template -class BlockingUnaryCallImpl; -template -class CallOpSet; -} // namespace internal - -extern CoreCodegenInterface* g_core_codegen_interface; - -} // namespace grpc - -namespace grpc_impl { - -/// A thin wrapper around \ref grpc_completion_queue (see \ref -/// src/core/lib/surface/completion_queue.h). -/// See \ref doc/cpp/perf_notes.md for notes on best practices for high -/// performance servers. -class CompletionQueue : private ::grpc::GrpcLibraryCodegen { - public: - /// Default constructor. Implicitly creates a \a grpc_completion_queue - /// instance. - CompletionQueue() - : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, - nullptr}) {} - - /// Wrap \a take, taking ownership of the instance. - /// - /// \param take The completion queue instance to wrap. Ownership is taken. - explicit CompletionQueue(grpc_completion_queue* take); - - /// Destructor. Destroys the owned wrapped completion queue / instance. - ~CompletionQueue() { - ::grpc::g_core_codegen_interface->grpc_completion_queue_destroy(cq_); - } - - /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. - enum NextStatus { - SHUTDOWN, ///< The completion queue has been shutdown and fully-drained - GOT_EVENT, ///< Got a new event; \a tag will be filled in with its - ///< associated value; \a ok indicating its success. - TIMEOUT ///< deadline was reached. - }; - - /// Read from the queue, blocking until an event is available or the queue is - /// shutting down. - /// - /// \param tag [out] Updated to point to the read event's tag. - /// \param ok [out] true if read a successful event, false otherwise. - /// - /// Note that each tag sent to the completion queue (through RPC operations - /// or alarms) will be delivered out of the completion queue by a call to - /// Next (or a related method), regardless of whether the operation succeeded - /// or not. Success here means that this operation completed in the normal - /// valid manner. - /// - /// Server-side RPC request: \a ok indicates that the RPC has indeed - /// been started. If it is false, the server has been Shutdown - /// before this particular call got matched to an incoming RPC. - /// - /// Client-side StartCall/RPC invocation: \a ok indicates that the RPC is - /// going to go to the wire. If it is false, it not going to the wire. This - /// would happen if the channel is either permanently broken or - /// transiently broken but with the fail-fast option. (Note that async unary - /// RPCs don't post a CQ tag at this point, nor do client-streaming - /// or bidi-streaming RPCs that have the initial metadata corked option set.) - /// - /// Client-side Write, Client-side WritesDone, Server-side Write, - /// Server-side Finish, Server-side SendInitialMetadata (which is - /// typically included in Write or Finish when not done explicitly): - /// \a ok means that the data/metadata/status/etc is going to go to the - /// wire. If it is false, it not going to the wire because the call - /// is already dead (i.e., canceled, deadline expired, other side - /// dropped the channel, etc). - /// - /// Client-side Read, Server-side Read, Client-side - /// RecvInitialMetadata (which is typically included in Read if not - /// done explicitly): \a ok indicates whether there is a valid message - /// that got read. If not, you know that there are certainly no more - /// messages that can ever be read from this stream. For the client-side - /// operations, this only happens because the call is dead. For the - /// server-sider operation, though, this could happen because the client - /// has done a WritesDone already. - /// - /// Client-side Finish: \a ok should always be true - /// - /// Server-side AsyncNotifyWhenDone: \a ok should always be true - /// - /// Alarm: \a ok is true if it expired, false if it was canceled - /// - /// \return true if got an event, false if the queue is fully drained and - /// shut down. - bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, - ::grpc::g_core_codegen_interface->gpr_inf_future( - GPR_CLOCK_REALTIME)) != SHUTDOWN); - } - - /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). - /// Both \a tag and \a ok are updated upon success (if an event is available - /// within the \a deadline). A \a tag points to an arbitrary location usually - /// employed to uniquely identify an event. - /// - /// \param tag [out] Upon sucess, updated to point to the event's tag. - /// \param ok [out] Upon sucess, true if a successful event, false otherwise - /// See documentation for CompletionQueue::Next for explanation of ok - /// \param deadline [in] How long to block in wait for an event. - /// - /// \return The type of event read. - template - NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { - ::grpc::TimePoint deadline_tp(deadline); - return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); - } - - /// EXPERIMENTAL - /// First executes \a F, then reads from the queue, blocking up to - /// \a deadline (or the queue's shutdown). - /// Both \a tag and \a ok are updated upon success (if an event is available - /// within the \a deadline). A \a tag points to an arbitrary location usually - /// employed to uniquely identify an event. - /// - /// \param f [in] Function to execute before calling AsyncNext on this queue. - /// \param tag [out] Upon sucess, updated to point to the event's tag. - /// \param ok [out] Upon sucess, true if read a regular event, false - /// otherwise. - /// \param deadline [in] How long to block in wait for an event. - /// - /// \return The type of event read. - template - NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) { - CompletionQueueTLSCache cache = CompletionQueueTLSCache(this); - f(); - if (cache.Flush(tag, ok)) { - return GOT_EVENT; - } else { - return AsyncNext(tag, ok, deadline); - } - } - - /// Request the shutdown of the queue. - /// - /// \warning This method must be called at some point if this completion queue - /// is accessed with Next or AsyncNext. \a Next will not return false - /// until this method has been called and all pending tags have been drained. - /// (Likewise for \a AsyncNext returning \a NextStatus::SHUTDOWN .) - /// Only once either one of these methods does that (that is, once the queue - /// has been \em drained) can an instance of this class be destroyed. - /// Also note that applications must ensure that no work is enqueued on this - /// completion queue after this method is called. - void Shutdown(); - - /// Returns a \em raw pointer to the underlying \a grpc_completion_queue - /// instance. - /// - /// \warning Remember that the returned instance is owned. No transfer of - /// owership is performed. - grpc_completion_queue* cq() { return cq_; } - - protected: - /// Private constructor of CompletionQueue only visible to friend classes - CompletionQueue(const grpc_completion_queue_attributes& attributes) { - cq_ = ::grpc::g_core_codegen_interface->grpc_completion_queue_create( - ::grpc::g_core_codegen_interface->grpc_completion_queue_factory_lookup( - &attributes), - &attributes, NULL); - InitialAvalanching(); // reserve this for the future shutdown - } - - private: - // Friend synchronous wrappers so that they can access Pluck(), which is - // a semi-private API geared towards the synchronous implementation. - template - friend class ::grpc::ClientReader; - template - friend class ::grpc::ClientWriter; - template - friend class ::grpc::ClientReaderWriter; - template - friend class ::grpc::ServerReader; - template - friend class ::grpc::ServerWriter; - template - friend class ::grpc::internal::ServerReaderWriterBody; - template - friend class ::grpc::internal::RpcMethodHandler; - template - friend class ::grpc::internal::ClientStreamingHandler; - template - friend class ::grpc::internal::ServerStreamingHandler; - template - friend class ::grpc::internal::TemplatedBidiStreamingHandler; - template <::grpc::StatusCode code> - friend class ::grpc::internal::ErrorMethodHandler; - friend class ::grpc_impl::Server; - friend class ::grpc::ServerContext; - friend class ::grpc::ServerInterface; - template - friend class ::grpc::internal::BlockingUnaryCallImpl; - - // Friends that need access to constructor for callback CQ - friend class ::grpc_impl::Channel; - - // For access to Register/CompleteAvalanching - template - friend class ::grpc::internal::CallOpSet; - - /// EXPERIMENTAL - /// Creates a Thread Local cache to store the first event - /// On this completion queue queued from this thread. Once - /// initialized, it must be flushed on the same thread. - class CompletionQueueTLSCache { - public: - CompletionQueueTLSCache(CompletionQueue* cq); - ~CompletionQueueTLSCache(); - bool Flush(void** tag, bool* ok); - - private: - CompletionQueue* cq_; - bool flushed_; - }; - - NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); - - /// Wraps \a grpc_completion_queue_pluck. - /// \warning Must not be mixed with calls to \a Next. - bool Pluck(::grpc::internal::CompletionQueueTag* tag) { - auto deadline = - ::grpc::g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME); - while (true) { - auto ev = ::grpc::g_core_codegen_interface->grpc_completion_queue_pluck( - cq_, tag, deadline, nullptr); - bool ok = ev.success != 0; - void* ignored = tag; - if (tag->FinalizeResult(&ignored, &ok)) { - GPR_CODEGEN_ASSERT(ignored == tag); - return ok; - } - } - } - - /// Performs a single polling pluck on \a tag. - /// \warning Must not be mixed with calls to \a Next. - /// - /// TODO: sreek - This calls tag->FinalizeResult() even if the cq_ is already - /// shutdown. This is most likely a bug and if it is a bug, then change this - /// implementation to simple call the other TryPluck function with a zero - /// timeout. i.e: - /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) - void TryPluck(::grpc::internal::CompletionQueueTag* tag) { - auto deadline = - ::grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); - auto ev = ::grpc::g_core_codegen_interface->grpc_completion_queue_pluck( - cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT) return; - bool ok = ev.success != 0; - void* ignored = tag; - // the tag must be swallowed if using TryPluck - GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); - } - - /// Performs a single polling pluck on \a tag. Calls tag->FinalizeResult if - /// the pluck() was successful and returned the tag. - /// - /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects - /// that the tag is internal not something that is returned to the user. - void TryPluck(::grpc::internal::CompletionQueueTag* tag, - gpr_timespec deadline) { - auto ev = ::grpc::g_core_codegen_interface->grpc_completion_queue_pluck( - cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { - return; - } - - bool ok = ev.success != 0; - void* ignored = tag; - GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); - } - - /// Manage state of avalanching operations : completion queue tags that - /// trigger other completion queue operations. The underlying core completion - /// queue should not really shutdown until all avalanching operations have - /// been finalized. Note that we maintain the requirement that an avalanche - /// registration must take place before CQ shutdown (which must be maintained - /// elsehwere) - void InitialAvalanching() { - gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); - } - void RegisterAvalanching() { - gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, - static_cast(1)); - } - void CompleteAvalanching() { - if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, - static_cast(-1)) == 1) { - ::grpc::g_core_codegen_interface->grpc_completion_queue_shutdown(cq_); - } - } - - grpc_completion_queue* cq_; // owned - - gpr_atm avalanches_in_flight_; -}; - -/// A specific type of completion queue used by the processing of notifications -/// by servers. Instantiated by \a ServerBuilder. -class ServerCompletionQueue : public CompletionQueue { - public: - bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } - - protected: - /// Default constructor - ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} - - private: - /// \param completion_type indicates whether this is a NEXT or CALLBACK - /// completion queue. - /// \param polling_type Informs the GRPC library about the type of polling - /// allowed on this completion queue. See grpc_cq_polling_type's description - /// in grpc_types.h for more details. - /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues - ServerCompletionQueue(grpc_cq_completion_type completion_type, - grpc_cq_polling_type polling_type, - grpc_experimental_completion_queue_functor* shutdown_cb) - : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, completion_type, polling_type, - shutdown_cb}), - polling_type_(polling_type) {} - - grpc_cq_polling_type polling_type_; - friend class ::grpc_impl::ServerBuilder; - friend class ::grpc_impl::Server; -}; - -} // namespace grpc_impl - -#endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H diff --git a/include/grpcpp/impl/codegen/intercepted_channel.h b/include/grpcpp/impl/codegen/intercepted_channel.h index cd0fcc06753..5255a6d1472 100644 --- a/include/grpcpp/impl/codegen/intercepted_channel.h +++ b/include/grpcpp/impl/codegen/intercepted_channel.h @@ -21,10 +21,6 @@ #include -namespace grpc_impl { -class CompletionQueue; -} - namespace grpc { namespace internal { @@ -50,7 +46,7 @@ class InterceptedChannel : public ChannelInterface { : channel_(channel), interceptor_pos_(pos) {} Call CreateCall(const RpcMethod& method, ClientContext* context, - ::grpc_impl::CompletionQueue* cq) override { + CompletionQueue* cq) override { return channel_->CreateCallInternal(method, context, cq, interceptor_pos_); } @@ -62,8 +58,7 @@ class InterceptedChannel : public ChannelInterface { } void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline, - ::grpc_impl::CompletionQueue* cq, + gpr_timespec deadline, CompletionQueue* cq, void* tag) override { return channel_->NotifyOnStateChangeImpl(last_observed, deadline, cq, tag); } @@ -72,9 +67,7 @@ class InterceptedChannel : public ChannelInterface { return channel_->WaitForStateChangeImpl(last_observed, deadline); } - ::grpc_impl::CompletionQueue* CallbackCQ() override { - return channel_->CallbackCQ(); - } + CompletionQueue* CallbackCQ() override { return channel_->CallbackCQ(); } ChannelInterface* channel_; size_t interceptor_pos_; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index fe5a0097d0a..f65598db41f 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -42,12 +42,13 @@ struct grpc_call; struct census_context; namespace grpc_impl { -class CompletionQueue; + class Server; } // namespace grpc_impl namespace grpc { class ClientContext; class GenericServerContext; +class CompletionQueue; class ServerInterface; template class ServerAsyncReader; @@ -89,7 +90,6 @@ class Call; class ServerReactor; } // namespace internal -class ServerInterface; namespace testing { class InteropServerContextInspector; class ServerContextTestSpouse; @@ -354,7 +354,7 @@ class ServerContext { gpr_timespec deadline_; grpc_call* call_; - ::grpc_impl::CompletionQueue* cq_; + CompletionQueue* cq_; bool sent_initial_metadata_; mutable std::shared_ptr auth_context_; mutable internal::MetadataMap client_metadata_; diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 9e96a66c8af..9299fdb20e1 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -29,14 +29,14 @@ #include namespace grpc_impl { -class CompletionQueue; -class ServerCompletionQueue; +class Channel; class ServerCredentials; } // namespace grpc_impl namespace grpc { class AsyncGenericService; class GenericServerContext; +class ServerCompletionQueue; class ServerContext; class Service; @@ -160,8 +160,7 @@ class ServerInterface : public internal::CallHook { /// caller is required to keep all completion queues live until the server is /// destroyed. /// \param num_cqs How many completion queues does \a cqs hold. - virtual void Start(::grpc_impl::ServerCompletionQueue** cqs, - size_t num_cqs) = 0; + virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; virtual void ShutdownInternal(gpr_timespec deadline) = 0; @@ -176,9 +175,9 @@ class ServerInterface : public internal::CallHook { public: BaseAsyncRequest(ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag, bool delete_on_finalize); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + bool delete_on_finalize); virtual ~BaseAsyncRequest(); bool FinalizeResult(void** tag, bool* status) override; @@ -190,8 +189,8 @@ class ServerInterface : public internal::CallHook { ServerInterface* const server_; ServerContext* const context_; internal::ServerAsyncStreamingInterface* const stream_; - ::grpc_impl::CompletionQueue* const call_cq_; - ::grpc_impl::ServerCompletionQueue* const notification_cq_; + CompletionQueue* const call_cq_; + ServerCompletionQueue* const notification_cq_; void* const tag_; const bool delete_on_finalize_; grpc_call* call_; @@ -205,17 +204,16 @@ class ServerInterface : public internal::CallHook { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag, const char* name, - internal::RpcMethod::RpcType type); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + const char* name, internal::RpcMethod::RpcType type); virtual bool FinalizeResult(void** tag, bool* status) override { /* If we are done intercepting, then there is nothing more for us to do */ if (done_intercepting_) { return BaseAsyncRequest::FinalizeResult(tag, status); } - call_wrapper_ = ::grpc::internal::Call( + call_wrapper_ = internal::Call( call_, server_, call_cq_, server_->max_receive_message_size(), context_->set_server_rpc_info(name_, type_, *server_->interceptor_creators())); @@ -224,7 +222,7 @@ class ServerInterface : public internal::CallHook { protected: void IssueRequest(void* registered_method, grpc_byte_buffer** payload, - ::grpc_impl::ServerCompletionQueue* notification_cq); + ServerCompletionQueue* notification_cq); const char* name_; const internal::RpcMethod::RpcType type_; }; @@ -234,9 +232,8 @@ class ServerInterface : public internal::CallHook { NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag) + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) : RegisteredAsyncRequest( server, context, stream, call_cq, notification_cq, tag, registered_method->name(), registered_method->method_type()) { @@ -252,9 +249,9 @@ class ServerInterface : public internal::CallHook { PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag, Message* request) + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + Message* request) : RegisteredAsyncRequest( server, context, stream, call_cq, notification_cq, tag, registered_method->name(), registered_method->method_type()), @@ -309,9 +306,9 @@ class ServerInterface : public internal::CallHook { ServerInterface* const server_; ServerContext* const context_; internal::ServerAsyncStreamingInterface* const stream_; - ::grpc_impl::CompletionQueue* const call_cq_; + CompletionQueue* const call_cq_; - ::grpc_impl::ServerCompletionQueue* const notification_cq_; + ServerCompletionQueue* const notification_cq_; void* const tag_; Message* const request_; ByteBuffer payload_; @@ -321,9 +318,9 @@ class ServerInterface : public internal::CallHook { public: GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag, bool delete_on_finalize); + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + bool delete_on_finalize); bool FinalizeResult(void** tag, bool* status) override; @@ -335,9 +332,9 @@ class ServerInterface : public internal::CallHook { void RequestAsyncCall(internal::RpcServiceMethod* method, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag, Message* message) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + Message* message) { GPR_CODEGEN_ASSERT(method); new PayloadAsyncRequest(method, this, context, stream, call_cq, notification_cq, tag, message); @@ -346,19 +343,18 @@ class ServerInterface : public internal::CallHook { void RequestAsyncCall(internal::RpcServiceMethod* method, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, - void* tag) { + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); new NoPayloadAsyncRequest(method, this, context, stream, call_cq, notification_cq, tag); } - void RequestAsyncGenericCall( - GenericServerContext* context, - internal::ServerAsyncStreamingInterface* stream, - ::grpc_impl::CompletionQueue* call_cq, - ::grpc_impl::ServerCompletionQueue* notification_cq, void* tag) { + void RequestAsyncGenericCall(GenericServerContext* context, + internal::ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) { new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, tag, true); } @@ -383,7 +379,7 @@ class ServerInterface : public internal::CallHook { // Returns nullptr (rather than being pure) since this is a post-1.0 method // and adding a new pure method to an interface would be a breaking change // (even though this is private and non-API) - virtual ::grpc_impl::CompletionQueue* CallbackCQ() { return nullptr; } + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index 0e0fc8d5aa7..1d94abe12dd 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -32,7 +32,9 @@ class Server; } // namespace grpc_impl namespace grpc { +class CompletionQueue; class ServerInterface; +class ServerCompletionQueue; class ServerContext; namespace internal { diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index d9ec7c42f3d..89c4eba1d95 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -21,6 +21,13 @@ #include +namespace grpc_impl { + +class Server; +class ServerCredentials; +class ResourceQuota; +} // namespace grpc_impl + namespace grpc { typedef ::grpc_impl::ServerBuilder ServerBuilder; diff --git a/include/grpcpp/server_builder_impl.h b/include/grpcpp/server_builder_impl.h index be21ed024ea..f7624deba5f 100644 --- a/include/grpcpp/server_builder_impl.h +++ b/include/grpcpp/server_builder_impl.h @@ -38,14 +38,14 @@ struct grpc_resource_quota; namespace grpc_impl { -class CompletionQueue; class ResourceQuota; -class ServerCompletionQueue; class ServerCredentials; } // namespace grpc_impl namespace grpc { class AsyncGenericService; +class CompletionQueue; +class ServerCompletionQueue; class Service; namespace testing { diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 19837d2c264..6b53b3f64ce 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -147,10 +147,10 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, printer->Print(vars, "\n"); printer->Print(vars, "namespace grpc_impl {\n"); printer->Print(vars, "class Channel;\n"); - printer->Print(vars, "class CompletionQueue;\n"); - printer->Print(vars, "class ServerCompletionQueue;\n"); printer->Print(vars, "} // namespace grpc_impl\n\n"); printer->Print(vars, "namespace grpc {\n"); + printer->Print(vars, "class CompletionQueue;\n"); + printer->Print(vars, "class ServerCompletionQueue;\n"); printer->Print(vars, "class ServerContext;\n"); printer->Print(vars, "} // namespace grpc\n\n"); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 43c2eee96f8..4bb3bcbd8b6 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -24,9 +24,9 @@ #include #include -namespace grpc_impl { +namespace grpc { -static ::grpc::internal::GrpcLibraryInitializer g_gli_initializer; +static internal::GrpcLibraryInitializer g_gli_initializer; // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create @@ -52,8 +52,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto core_cq_tag = - static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag); + auto core_cq_tag = static_cast(ev.tag); *ok = ev.success != 0; *tag = core_cq_tag; if (core_cq_tag->FinalizeResult(tag, ok)) { @@ -80,8 +79,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto core_cq_tag = - static_cast<::grpc::internal::CompletionQueueTag*>(res_tag); + auto core_cq_tag = static_cast(res_tag); *ok = res == 1; if (core_cq_tag->FinalizeResult(tag, ok)) { return true; @@ -90,4 +88,4 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { return false; } -} // namespace grpc_impl +} // namespace grpc diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 5dc00a85c3a..9202585535b 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -42,11 +42,11 @@ namespace grpc_impl { class Channel; -class CompletionQueue; -class ServerCompletionQueue; } // namespace grpc_impl namespace grpc { +class CompletionQueue; +class ServerCompletionQueue; class ServerContext; } // namespace grpc diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 885c1427247..8659a7d7381 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -959,7 +959,6 @@ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ -include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/config_protobuf.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index e87914569ce..65369aa0e43 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -960,7 +960,6 @@ include/grpcpp/impl/codegen/client_context.h \ include/grpcpp/impl/codegen/client_interceptor.h \ include/grpcpp/impl/codegen/client_unary_call.h \ include/grpcpp/impl/codegen/completion_queue.h \ -include/grpcpp/impl/codegen/completion_queue_impl.h \ include/grpcpp/impl/codegen/completion_queue_tag.h \ include/grpcpp/impl/codegen/config.h \ include/grpcpp/impl/codegen/config_protobuf.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 1782e18eb1d..27b2909a690 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9909,7 +9909,6 @@ "include/grpcpp/impl/codegen/client_interceptor.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", - "include/grpcpp/impl/codegen/completion_queue_impl.h", "include/grpcpp/impl/codegen/completion_queue_tag.h", "include/grpcpp/impl/codegen/config.h", "include/grpcpp/impl/codegen/core_codegen_interface.h", @@ -9986,7 +9985,6 @@ "include/grpcpp/impl/codegen/client_interceptor.h", "include/grpcpp/impl/codegen/client_unary_call.h", "include/grpcpp/impl/codegen/completion_queue.h", - "include/grpcpp/impl/codegen/completion_queue_impl.h", "include/grpcpp/impl/codegen/completion_queue_tag.h", "include/grpcpp/impl/codegen/config.h", "include/grpcpp/impl/codegen/core_codegen_interface.h",